In [1]:
# !pip install sqlalchemy psycopg2-binary
import os, pandas as pd
from sqlalchemy import create_engine

pg_user = os.getenv("PGUSER", "postgres")
pg_pass = os.getenv("PGPASSWORD", "CSDBMS623")
pg_host = os.getenv("PGHOST", "localhost")
pg_port = os.getenv("PGPORT", "5432")
pg_db   = os.getenv("PGDATABASE", "SP500_ML")

engine = create_engine(f"postgresql+psycopg2://{pg_user}:{pg_pass}@{pg_host}:{pg_port}/{pg_db}")

# Use only the most recent membership date to define the current universe
universe = pd.read_sql_query("""
    SELECT DISTINCT UPPER(TRIM(latest_ticker)) AS latest_ticker
    FROM sp500_long_latest_profiles
    WHERE latest_ticker IS NOT NULL

""", engine)["latest_ticker"].tolist()

print("Universe size:", len(universe))

# Now run your Yahoo fetcher
#prices_df = fetch_prices_for_universe(universe, checkpoint_path="prices_checkpoint.parquete

Universe size: 679


# Cash Flow Pipeline (Quarterly Pull → TTM Sums → Signed YoY Growth)

This block fetches **quarterly cash-flow statements** from FMP, then builds:
1) **As-of validity windows** per filing,  
2) **TTM (trailing-twelve-months)** sums for key cash-flow lines, and  
3) **YoY growth on TTM** using a robust, **symmetric percent change** that behaves well at zeros/negatives.

---

## What it pulls and computes

### 1) Fetch (per ticker)
- Endpoint: `/api/v3/cash-flow-statement/{ticker}?period=quarter&limit=120&apikey=...`
- Robust GET with retries on 429/5xx.
- Cleans `symbol`, coerces `date` to datetime, **de-dupes** `(symbol, date)`, sorts by `["symbol","date"]`.

### 2) As-of validity windows
For each `symbol`, set:
- `date_start = date` (filing’s period end date)
- `date_end   = next(date)` within the same symbol; last row uses `2100-01-01`  
Interpretation: the row is valid on **[date_start, date_end)** for **as-of joins** to prices or weekly panels (no look-ahead).

### 3) TTM construction (strict 4-quarter sums)
Computed per `symbol` with `rolling(4, min_periods=4).sum()`:

- `operatingCashFlow_ttm`
- `freeCashFlow_ttm`
- `investingCashFlow_ttm` (from FMP’s field `netCashUsedForInvestingActivites` — note the **API typo**; code aliases the correctly spelled variant if present)

Values are integer-like and stored as pandas `Int64` (nullable).

### 4) YoY growth on TTM (robust / symmetric)
For each TTM series, compare quarter `t` to `t–4`:

\[
\text{growth}=\frac{2\,(a-b)}{|a|+|b|}
\quad\text{with}\quad a=\text{TTM}_t,\; b=\text{TTM}_{t-4}
\]

- Bounded in **[−2, +2]** (i.e., −200% to +200%)
- Handles zeros and sign flips gracefully (if `a=b=0`, returns 0.0)
- Columns produced:
  - `operatingCashFlow_ttm_growth`
  - `freeCashFlow_ttm_growth`
  - `investingCashFlow_ttm_growth`

---

## Key functions & parameters

- `fetch_cashflows_one(...)`: one-ticker pull; cleans, sorts, de-dupes.
- `add_cashflow_ttm_and_growth(df)`: adds **windows**, **TTM**, and **YoY growth**.
- `fetch_cashflows(tickers, ...)`: orchestrator for many tickers with batching and polite sleeps.
  - `batch_size`: tickers per batch (default 25)
  - `sleep_between_batches`: pause between batches (seconds)
  - `skip_errors=True`: continue on per-ticker API issues

---

## Output schema (core fields)

- Identity/timing: `symbol`, `date`, `date_start`, `date_end`
- Cash-flow lines (as provided by FMP), e.g.:
  - `operatingCashFlow`, `freeCashFlow`, `netCashUsedForInvestingActivites` (FMP spelling)
- TTM totals:
  - `operatingCashFlow_ttm`, `freeCashFlow_ttm`, `investingCashFlow_ttm`
- TTM YoY growth (symmetric):
  - `operatingCashFlow_ttm_growth`, `freeCashFlow_ttm_growth`, `investingCashFlow_ttm_growth`

---

## Usage notes & gotchas

- **Ordering matters**: calculations sort by `["symbol","date"]` before rolling/shift; storage order in SQL is arbitrary—always `ORDER BY` when querying.
- **API field spelling**: FMP sometimes returns `netCashUsedForInvestingActivites` (missing “i”). The code auto-aliases from the correct spelling if encountered.
- **Missing quarters**: if history is sparse, first 3 quarters have TTM = NA; growth needs both `t` and `t−4`, so early quarters get NA.
- **As-of joins**: when merging to prices/targets, join on **[date_start, date_end)** to avoid look-ahead.

---

## Quick sanity checks

```python
# No duplicate (symbol, date)
assert not df_cf.duplicated(["symbol","date"]).any()

# TTM available only after 4 quarters
assert df_cf.groupby("symbol")["operatingCashFlow_ttm"].apply(lambda s: s.first_valid_index() is not None).all()

# Growth computed only when both t and t-4 exist
na_rate = df_cf["operatingCashFlow_ttm_growth"].isna().mean()
print(f"NA share (growth needs t & t-4): {na_rate:.1%}")


In [3]:
# ===================== FMP Cash Flow Pull + TTM + Signed YoY Growth =====================
import time
from typing import Iterable, List, Optional, Tuple, Union
import requests
import pandas as pd

FMP_CF_BASE = "https://financialmodelingprep.com/api/v3/cash-flow-statement"
MAX_VALID_DATE = pd.Timestamp("2100-01-01")

# -------- Robust GET with simple retries --------
def _get_with_retries(
    session: requests.Session,
    url: str,
    params: dict,
    timeout: int = 30,
    max_retries: int = 4,
    base_sleep: float = 1.0,
):
    last = None
    for attempt in range(1, max_retries + 1):
        resp = session.get(url, params=params, timeout=timeout)
        if resp.status_code == 200:
            return resp
        last = resp
        if resp.status_code in (429, 500, 502, 503, 504):
            time.sleep(base_sleep * (2 ** (attempt - 1)))
            continue
        resp.raise_for_status()
    if last is not None:
        last.raise_for_status()
    raise RuntimeError("Request failed without response.")

# -------- Normalize API JSON --------
def _normalize_fmp_json(j):
    if isinstance(j, list):
        return j
    if isinstance(j, dict):
        for k in ("error", "Error", "message", "Note", "Error Message"):
            if k in j and isinstance(j[k], str):
                raise RuntimeError(f"API message: {j[k]}")
        for k in ("financials", "items", "data", "results", "financialStatements"):
            if k in j and isinstance(j[k], list):
                return j[k]
        return [j]
    raise RuntimeError(f"Unexpected JSON type: {type(j)}")

# -------- Signed symmetric YoY % change (robust to zeros/negatives) --------
def _signed_pct_change(curr: pd.Series, prev: pd.Series) -> pd.Series:
    a = pd.to_numeric(curr, errors="coerce").astype("Float64")
    b = pd.to_numeric(prev, errors="coerce").astype("Float64")
    denom = a.abs() + b.abs()
    out = pd.Series(pd.NA, index=a.index, dtype="Float64")
    valid = denom.notna() & (denom != 0)
    out.loc[valid] = 2.0 * (a[valid] - b[valid]) / denom[valid]
    both_zero = (a == 0) & (b == 0)
    out.loc[both_zero] = 0.0
    return out

# -------- One-ticker pull --------
def fetch_cashflows_one(
    ticker: str,
    api_key: str,
    session: Optional[requests.Session] = None,
    period: str = "quarter",
    limit: int = 120,
    timeout: int = 30,
) -> pd.DataFrame:
    """
    Pull FMP cash-flow statements for a single ticker and return a tidy DataFrame.
    """
    if session is None:
        session = requests.Session()
    url = f"{FMP_CF_BASE}/{ticker}"
    params = {"period": period, "apikey": api_key, "limit": limit}

    r = _get_with_retries(session, url, params, timeout=timeout)
    try:
        data = r.json()
    except ValueError as e:
        raise RuntimeError(f"Non-JSON response for {ticker}: {r.text[:300]}") from e

    records = _normalize_fmp_json(data)
    if not records:
        raise RuntimeError(f"No cash-flow records for {ticker}.")

    df = pd.DataFrame.from_records(records)

    # symbol/date clean
    if "symbol" in df.columns:
        df["symbol"] = df["symbol"].astype(str).str.upper()
        mask_missing = df["symbol"].isin(["", "None", "nan", "NaN"]) | df["symbol"].isna()
        df.loc[mask_missing, "symbol"] = ticker
    else:
        df["symbol"] = ticker.upper()

    if "date" not in df.columns:
        raise RuntimeError(f"'date' missing for {ticker}.")
    df["date"] = pd.to_datetime(df["date"], errors="coerce")

    # dedup/sort
    df = (
        df.dropna(subset=["date"])
          .drop_duplicates(subset=["symbol", "date"], keep="first")
          .sort_values(["symbol", "date"])
          .reset_index(drop=True)
    )
    return df

# -------- Add TTM + YoY growth (strict field names you provided) --------
def add_cashflow_ttm_and_growth(df: pd.DataFrame) -> pd.DataFrame:
    """
    Adds TTM sums and signed YoY growth for:
      operatingCashFlow, freeCashFlow, netCashUsedForInvestingActivites (FMP typo).
    Expects these columns to exist (as in your sample schema).
    """
    req = ["symbol", "date",
           "operatingCashFlow", "freeCashFlow", "netCashUsedForInvestingActivites"]
    missing = [c for c in req if c not in df.columns]
    if missing:
        # grace: accept correct 'Activities' spelling and alias if needed
        if "netCashUsedForInvestingActivites" in missing and "netCashUsedForInvestingActivities" in df.columns:
            df = df.copy()
            df["netCashUsedForInvestingActivites"] = df["netCashUsedForInvestingActivities"]
            missing = [c for c in req if c not in df.columns]
        if missing:
            raise ValueError(f"Missing required columns: {missing}")

    df = df.copy()
    df["symbol"] = df["symbol"].astype(str).str.upper()
    df["date"]   = pd.to_datetime(df["date"], errors="coerce")
    df = (
        df.dropna(subset=["date"])
          .drop_duplicates(subset=["symbol","date"], keep="first")
          .sort_values(["symbol","date"])
          .reset_index(drop=True)
    )

    # SCD windows
    if "date_start" not in df.columns:
        df["date_start"] = df["date"]
    if "date_end" not in df.columns:
        df["date_end"] = df.groupby("symbol")["date"].shift(-1).fillna(MAX_VALID_DATE)

    # numeric bases
    for c in ["operatingCashFlow", "freeCashFlow", "netCashUsedForInvestingActivites"]:
        df[c] = pd.to_numeric(df[c], errors="coerce")

    # TTM (4-quarter rolling sums)
    df["operatingCashFlow_ttm"] = (
        df.groupby("symbol", group_keys=False)["operatingCashFlow"]
          .apply(lambda s: s.rolling(4, min_periods=4).sum().round(0).astype("Int64"))
    )
    df["freeCashFlow_ttm"] = (
        df.groupby("symbol", group_keys=False)["freeCashFlow"]
          .apply(lambda s: s.rolling(4, min_periods=4).sum().round(0).astype("Int64"))
    )
    df["investingCashFlow_ttm"] = (
        df.groupby("symbol", group_keys=False)["netCashUsedForInvestingActivites"]
          .apply(lambda s: s.rolling(4, min_periods=4).sum().round(0).astype("Int64"))
    )

    # YoY growth on TTM (shift 4 quarters)
    for ttm_col, out_col in [
        ("operatingCashFlow_ttm", "operatingCashFlow_ttm_growth"),
        ("freeCashFlow_ttm",      "freeCashFlow_ttm_growth"),
        ("investingCashFlow_ttm", "investingCashFlow_ttm_growth"),
    ]:
        prev = df.groupby("symbol")[ttm_col].shift(4)
        df[out_col] = _signed_pct_change(df[ttm_col], prev)

    return df

# -------- Multi-ticker orchestrator --------
def fetch_cashflows(
    tickers: Union[str, Iterable[str]],
    api_key: str,
    period: str = "quarter",
    limit: int = 120,
    batch_size: int = 25,
    sleep_between_batches: float = 1.0,
    timeout: int = 30,
    skip_errors: bool = True,
    verbose: bool = True,
) -> pd.DataFrame:
    """
    Fetch cash-flow statements for many tickers, add TTM + YoY growth, return one DataFrame.
    """
    if isinstance(tickers, str):
        tickers = [tickers]
    tickers = [t.upper().strip() for t in tickers if str(t).strip()]
    session = requests.Session()

    frames: List[pd.DataFrame] = []
    skipped: List[Tuple[str, str]] = []

    for i in range(0, len(tickers), batch_size):
        batch = tickers[i:i + batch_size]
        if verbose:
            print(f"Batch {i//batch_size + 1}: {len(batch)} tickers "
                  f"({i+1}–{min(i+len(batch), len(tickers))} of {len(tickers)})")

        for t in batch:
            try:
                df_t = fetch_cashflows_one(
                    t, api_key=api_key, session=session,
                    period=period, limit=limit, timeout=timeout
                )
                frames.append(df_t)
            except Exception as e:
                if skip_errors:
                    skipped.append((t, str(e)))
                    if verbose:
                        print(f"  [skip] {t}: {e}")
                else:
                    raise

        if i + batch_size < len(tickers):
            time.sleep(sleep_between_batches)

    if not frames:
        if verbose:
            print("No successful cash-flow pulls.")
            if skipped:
                print(f"Skipped {len(skipped)} tickers. Examples: {skipped[:5]}")
        return pd.DataFrame()

    df_all = pd.concat(frames, ignore_index=True)

    # Add TTM + YoY growth (strict field names you provided)
    df_all = add_cashflow_ttm_and_growth(df_all)

    if verbose:
        print(f"✅ Success: {len(frames)} tickers; rows: {len(df_all)}")
        if skipped:
            print(f"⚠️ Skipped {len(skipped)} tickers.")

    return df_all

# -------------------------- Example usage --------------------------
if __name__ == "__main__":
    API_KEY = ""
    tickers = universe

    df_cf = fetch_cashflows(
        tickers=tickers,
        api_key=API_KEY,
        period="quarter",
        limit=120,
        batch_size=25,
        sleep_between_batches=15.0,
        skip_errors=True,
        verbose=True,
    )

    if not df_cf.empty:
        print(df_cf[[
            "symbol","date",
            "operatingCashFlow","operatingCashFlow_ttm","operatingCashFlow_ttm_growth",
            "freeCashFlow","freeCashFlow_ttm","freeCashFlow_ttm_growth",
            "netCashUsedForInvestingActivites","investingCashFlow_ttm","investingCashFlow_ttm_growth",
            "date_start","date_end"
        ]].head(12))

Batch 1: 25 tickers (1–25 of 679)
Batch 2: 25 tickers (26–50 of 679)
Batch 3: 25 tickers (51–75 of 679)
Batch 4: 25 tickers (76–100 of 679)
Batch 5: 25 tickers (101–125 of 679)
Batch 6: 25 tickers (126–150 of 679)
Batch 7: 25 tickers (151–175 of 679)
Batch 8: 25 tickers (176–200 of 679)
Batch 9: 25 tickers (201–225 of 679)
  [skip] EMC: No cash-flow records for EMC.
  [skip] EMCR: No cash-flow records for EMCR.
Batch 10: 25 tickers (226–250 of 679)
Batch 11: 25 tickers (251–275 of 679)
Batch 12: 25 tickers (276–300 of 679)
Batch 13: 25 tickers (301–325 of 679)
Batch 14: 25 tickers (326–350 of 679)
Batch 15: 25 tickers (351–375 of 679)
Batch 16: 25 tickers (376–400 of 679)
Batch 17: 25 tickers (401–425 of 679)
Batch 18: 25 tickers (426–450 of 679)
Batch 19: 25 tickers (451–475 of 679)
  [skip] NYX: No cash-flow records for NYX.
Batch 20: 25 tickers (476–500 of 679)
Batch 21: 25 tickers (501–525 of 679)
Batch 22: 25 tickers (526–550 of 679)
Batch 23: 25 tickers (551–575 of 679)
Batch 24:

In [6]:
# --- Quarterly YoY for cash-flow metrics (Qx(Y) vs Qx(Y-1), symmetric change) ---
# relies on your _signed_pct_change(...) helper

def add_cashflow_quarterly_yoy(df_cf: pd.DataFrame) -> pd.DataFrame:
    df = df_cf.copy()

    # Normalize / required cols
    df["symbol"] = df["symbol"].astype(str).str.upper().str.strip()
    # Period label (Q1..Q4)
    df["period_key"] = (
        df.get("period")
          .astype(str).str.upper().str.strip()
          if "period" in df.columns
          else ("Q" + pd.to_datetime(df["date"], errors="coerce").dt.quarter.astype("Int64").astype(str))
    )
    # Numeric year
    df["year_key"] = pd.to_numeric(df.get("calendarYear"), errors="coerce").astype("Int64")

    # FMP investing field often has a typo; alias if needed
    if "netCashUsedForInvestingActivites" not in df.columns and "netCashUsedForInvestingActivities" in df.columns:
        df["netCashUsedForInvestingActivites"] = df["netCashUsedForInvestingActivities"]

    need = ["operatingCashFlow", "freeCashFlow", "netCashUsedForInvestingActivites"]
    missing = [c for c in need if c not in df.columns]
    if missing:
        raise ValueError(f"Missing cash-flow columns: {missing}")

    # Keep real fiscal quarters only
    df = df[df["period_key"].isin(["Q1","Q2","Q3","Q4"])].copy()

    # Timestamps for dedupe priority
    for c in ["acceptedDate","fillingDate","date"]:
        if c in df.columns:
            df[c] = pd.to_datetime(df[c], errors="coerce")

    # Latest filing per (symbol, year, quarter)
    sort_cols = [c for c in ["acceptedDate","fillingDate","date"] if c in df.columns]
    latest = (df.sort_values(["symbol","year_key","period_key"] + sort_cols)
                .drop_duplicates(["symbol","year_key","period_key"], keep="last"))

    # Build prior-year table and align
    prev = latest[["symbol","period_key","year_key"] + need].rename(columns={
        "operatingCashFlow": "operatingCashFlow_prev",
        "freeCashFlow": "freeCashFlow_prev",
        "netCashUsedForInvestingActivites": "investingCF_prev",
    })
    prev["year_key"] = prev["year_key"] + 1  # so 2025 joins to 2024

    aligned = latest.merge(prev, on=["symbol","period_key","year_key"], how="left", validate="one_to_one")

    # Numeric coercion
    for c in ["operatingCashFlow","freeCashFlow","netCashUsedForInvestingActivites",
              "operatingCashFlow_prev","freeCashFlow_prev","investingCF_prev"]:
        if c in aligned.columns:
            aligned[c] = pd.to_numeric(aligned[c], errors="coerce").astype("Float64")

    # Symmetric QoQ YoY (same-quarter last year)
    aligned["operatingCashFlow_q_yoy"] = _signed_pct_change(
        aligned["operatingCashFlow"], aligned["operatingCashFlow_prev"]
    )
    aligned["freeCashFlow_q_yoy"] = _signed_pct_change(
        aligned["freeCashFlow"], aligned["freeCashFlow_prev"]
    )
    aligned["investingCashFlow_q_yoy"] = _signed_pct_change(
        aligned["netCashUsedForInvestingActivites"], aligned["investingCF_prev"]
    )

    # Merge back to original (many-to-one)
    out = df_cf.copy()
    out["symbol"] = out["symbol"].astype(str).str.upper().str.strip()
    out["period_key"] = (
        out.get("period").astype(str).str.upper().str.strip()
        if "period" in out.columns
        else ("Q" + pd.to_datetime(out["date"], errors="coerce").dt.quarter.astype("Int64").astype(str))
    )
    out["year_key"] = pd.to_numeric(out.get("calendarYear"), errors="coerce").astype("Int64")

    keep = ["symbol","period_key","year_key",
            "operatingCashFlow_q_yoy","freeCashFlow_q_yoy","investingCashFlow_q_yoy"]
    out = out.merge(aligned[keep], on=["symbol","period_key","year_key"], how="left", validate="many_to_one")

    return out.drop(columns=["period_key","year_key"])

# --- Run it on your cash-flow frame (with TTM already added) ---
df_cf = add_cashflow_quarterly_yoy(df_cf)

# Quick peek
print(df_cf[["symbol","calendarYear","period",
             "operatingCashFlow","operatingCashFlow_q_yoy",
             "freeCashFlow","freeCashFlow_q_yoy",
             "netCashUsedForInvestingActivites","investingCashFlow_q_yoy"]].head(12))


   symbol calendarYear period  operatingCashFlow  operatingCashFlow_q_yoy  \
0       A         2000     Q1       3.840000e+08                     <NA>   
1       A         2000     Q2      -3.800000e+07                     <NA>   
2       A         2000     Q3       6.900000e+07                     <NA>   
3       A         2000     Q4       4.230000e+08                     <NA>   
4       A         2001     Q1      -1.970000e+08                     -2.0   
5       A         2001     Q2       1.350000e+08                      2.0   
6       A         2001     Q3       2.910000e+08                 1.233333   
7       A         2001     Q4       1.273000e+09                 1.002358   
8       A         2002     Q1      -1.440000e+08                  0.31085   
9       A         2002     Q2      -9.500000e+07                     -2.0   
10      A         2002     Q3      -1.160000e+08                     -2.0   
11      A         2002     Q4      -1.430000e+08                     -2.0   

In [8]:
df_cf

Unnamed: 0,date,symbol,reportedCurrency,cik,fillingDate,acceptedDate,calendarYear,period,netIncome,depreciationAndAmortization,...,date_end,operatingCashFlow_ttm,freeCashFlow_ttm,investingCashFlow_ttm,operatingCashFlow_ttm_growth,freeCashFlow_ttm_growth,investingCashFlow_ttm_growth,operatingCashFlow_q_yoy,freeCashFlow_q_yoy,investingCashFlow_q_yoy
0,2000-01-31,A,USD,0001090872,2000-03-15,2000-03-15 00:00:00,2000,Q1,131000000.0,96000000.0,...,2000-04-30,,,,,,,,,
1,2000-04-30,A,USD,0001090872,2000-06-12,2000-06-12 00:00:00,2000,Q2,166000000.0,103000000.0,...,2000-07-31,,,,,,,,,
2,2000-07-31,A,USD,0001090872,2000-09-01,2000-09-01 00:00:00,2000,Q3,155000000.0,146000000.0,...,2000-10-31,,,,,,,,,
3,2000-10-31,A,USD,0001090872,2001-01-17,2001-01-17 00:00:00,2000,Q4,305000000.0,150000000.0,...,2001-01-31,838000000,14000000,-1117000000,,,,,,
4,2001-01-31,A,USD,0001090872,2001-03-19,2001-03-19 00:00:00,2001,Q1,154000000.0,139000000.0,...,2001-04-30,257000000,-649000000,-1779000000,,,,-2.0,-2.0,-1.2282
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
67418,2024-06-30,ZTS,USD,0001555280,2024-08-06,2024-08-06 12:41:47,2024,Q2,624000000.0,127000000.0,...,2024-09-30,2718000000,2103000000,-744000000,0.304855,0.482881,0.142322,0.931387,1.824289,-0.490566
67419,2024-09-30,ZTS,USD,0001555280,2024-11-04,2024-11-04 13:52:13,2024,Q3,692000000.0,121000000.0,...,2024-12-31,2945000000,2308000000,-651000000,0.290937,0.429474,0.427536,0.271045,0.300807,0.414254
67420,2024-12-31,ZTS,USD,0001555280,2025-02-13,2025-02-13 14:51:25,2024,Q4,581000000.0,123000000.0,...,2025-03-31,2953000000,2298000000,-315000000,0.226159,0.345496,0.846154,0.008879,-0.014409,2.0
67421,2025-03-31,ZTS,USD,0001555280,2025-05-06,2025-05-06 13:29:54,2025,Q1,631000000.0,119000000.0,...,2025-06-30,2945000000,2281000000,-359000000,0.204341,0.263458,0.633682,-0.013536,-0.038074,-0.287582


In [10]:
df_cf.to_csv('cashflowrec.csv')

In [12]:
# ===================== DB INGEST — Cash Flow (full + auto-add columns) =====================
import math, uuid
from typing import Sequence, Set, Dict
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
from sqlalchemy.types import BigInteger, Float, Text, DateTime

# ---------- Configure ----------
PG_CONN_STR = "postgresql://postgres:CSDBMS623@localhost:5432/SP500_ML"
SCHEMA      = "public"
TABLE       = "cashflow_statements_q"
CHUNK_ROWS  = 25_000

# Canonical order (lowercase) — now includes quarterly YoY metrics
COL_ORDER_LC = [
    "symbol", "date", "date_start", "date_end",
    "reportedcurrency", "cik", "fillingdate", "accepteddate",
    "calendaryear", "period",
    "netincome", "depreciationandamortization", "deferredincometax",
    "stockbasedcompensation", "changeinworkingcapital", "accountsreceivables",
    "inventory", "accountspayables", "otherworkingcapital", "othernoncashitems",
    "netcashprovidedbyoperatingactivities", "investmentsinpropertyplantandequipment",
    "acquisitionsnet", "purchasesofinvestments", "salesmaturitiesofinvestments",
    "otherinvestingactivites", "netcashusedforinvestingactivites",
    "debtrepayment", "commonstockissued", "commonstockrepurchased", "dividendspaid",
    "otherfinancingactivites", "netcashusedprovidedbyfinancingactivities",
    "effectofforexchangesoncash", "netchangeincash", "cashatendofperiod",
    "cashatbeginningofperiod", "operatingcashflow", "capitalexpenditure", "freecashflow",
    "link", "finallink",
    # TTM + growth
    "operatingcashflow_ttm", "freecashflow_ttm", "investingcashflow_ttm",
    "operatingcashflow_ttm_growth", "freecashflow_ttm_growth", "investingcashflow_ttm_growth",
    # NEW: true quarterly YoY (Qx(Y) vs Qx(Y-1))
    "operatingcashflow_q_yoy", "freecashflow_q_yoy", "investingcashflow_q_yoy",
]

# SQLAlchemy dtype map for known columns
PG_DTYPE = {
    # timestamps
    "date": DateTime(timezone=False),
    "date_start": DateTime(timezone=False),
    "date_end": DateTime(timezone=False),

    # text
    "symbol": Text(),
    "reportedcurrency": Text(),
    "cik": Text(),
    "fillingdate": Text(),
    "accepteddate": Text(),
    "calendaryear": Text(),
    "period": Text(),
    "link": Text(),
    "finallink": Text(),

    # base numeric columns (DOUBLE PRECISION)
    "netincome": Float(),
    "depreciationandamortization": Float(),
    "deferredincometax": Float(),
    "stockbasedcompensation": Float(),
    "changeinworkingcapital": Float(),
    "accountsreceivables": Float(),
    "inventory": Float(),
    "accountspayables": Float(),
    "otherworkingcapital": Float(),
    "othernoncashitems": Float(),
    "netcashprovidedbyoperatingactivities": Float(),
    "investmentsinpropertyplantandequipment": Float(),
    "acquisitionsnet": Float(),
    "purchasesofinvestments": Float(),
    "salesmaturitiesofinvestments": Float(),
    "otherinvestingactivites": Float(),
    "netcashusedforinvestingactivites": Float(),
    "debtrepayment": Float(),
    "commonstockissued": Float(),
    "commonstockrepurchased": Float(),
    "dividendspaid": Float(),
    "otherfinancingactivites": Float(),
    "netcashusedprovidedbyfinancingactivities": Float(),
    "effectofforexchangesoncash": Float(),
    "netchangeincash": Float(),
    "cashatendofperiod": Float(),
    "cashatbeginningofperiod": Float(),
    "operatingcashflow": Float(),
    "capitalexpenditure": Float(),
    "freecashflow": Float(),

    # TTM sums
    "operatingcashflow_ttm": BigInteger(),
    "freecashflow_ttm": BigInteger(),
    "investingcashflow_ttm": BigInteger(),

    # TTM YoY growths
    "operatingcashflow_ttm_growth": Float(),
    "freecashflow_ttm_growth": Float(),
    "investingcashflow_ttm_growth": Float(),

    # NEW: Quarterly YoY growths
    "operatingcashflow_q_yoy": Float(),
    "freecashflow_q_yoy": Float(),
    "investingcashflow_q_yoy": Float(),
}

# ----- Engine -----
def _get_engine(conn_str: str) -> Engine:
    return create_engine(conn_str, pool_pre_ping=True)

# ----- Create table (first-time) -----
def ensure_table_and_indexes(engine: Engine, schema: str, table: str):
    cols_sql = """
      symbol                                   TEXT,
      date                                     TIMESTAMP,
      date_start                               TIMESTAMP,
      date_end                                 TIMESTAMP,
      reportedcurrency                         TEXT,
      cik                                      TEXT,
      fillingdate                              TEXT,
      accepteddate                             TEXT,
      calendaryear                             TEXT,
      period                                   TEXT,
      netincome                                DOUBLE PRECISION,
      depreciationandamortization              DOUBLE PRECISION,
      deferredincometax                        DOUBLE PRECISION,
      stockbasedcompensation                   DOUBLE PRECISION,
      changeinworkingcapital                   DOUBLE PRECISION,
      accountsreceivables                      DOUBLE PRECISION,
      inventory                                DOUBLE PRECISION,
      accountspayables                         DOUBLE PRECISION,
      otherworkingcapital                      DOUBLE PRECISION,
      othernoncashitems                        DOUBLE PRECISION,
      netcashprovidedbyoperatingactivities     DOUBLE PRECISION,
      investmentsinpropertyplantandequipment   DOUBLE PRECISION,
      acquisitionsnet                          DOUBLE PRECISION,
      purchasesofinvestments                   DOUBLE PRECISION,
      salesmaturitiesofinvestments             DOUBLE PRECISION,
      otherinvestingactivites                  DOUBLE PRECISION,
      netcashusedforinvestingactivites         DOUBLE PRECISION,
      debtrepayment                            DOUBLE PRECISION,
      commonstockissued                        DOUBLE PRECISION,
      commonstockrepurchased                   DOUBLE PRECISION,
      dividendspaid                            DOUBLE PRECISION,
      otherfinancingactivites                  DOUBLE PRECISION,
      netcashusedprovidedbyfinancingactivities DOUBLE PRECISION,
      effectofforexchangesoncash               DOUBLE PRECISION,
      netchangeincash                          DOUBLE PRECISION,
      cashatendofperiod                        DOUBLE PRECISION,
      cashatbeginningofperiod                  DOUBLE PRECISION,
      operatingcashflow                        DOUBLE PRECISION,
      capitalexpenditure                       DOUBLE PRECISION,
      freecashflow                             DOUBLE PRECISION,
      operatingcashflow_ttm                    BIGINT,
      freecashflow_ttm                         BIGINT,
      investingcashflow_ttm                    BIGINT,
      operatingcashflow_ttm_growth             DOUBLE PRECISION,
      freecashflow_ttm_growth                  DOUBLE PRECISION,
      investingcashflow_ttm_growth             DOUBLE PRECISION,
      operatingcashflow_q_yoy                  DOUBLE PRECISION,
      freecashflow_q_yoy                       DOUBLE PRECISION,
      investingcashflow_q_yoy                  DOUBLE PRECISION,
      link                                     TEXT,
      finallink                                TEXT
    """.strip()

    ddl = f'CREATE TABLE IF NOT EXISTS "{schema}"."{table}" ({cols_sql});'
    uq  = f"""
    DO $$
    BEGIN
      IF NOT EXISTS (
        SELECT 1 FROM pg_constraint WHERE conname = '{table}_symbol_date_key'
      ) THEN
        ALTER TABLE "{schema}"."{table}"
        ADD CONSTRAINT {table}_symbol_date_key UNIQUE (symbol, date);
      END IF;
    END$$;
    """
    idx1 = f'CREATE INDEX IF NOT EXISTS {table}_symbol_idx ON "{schema}"."{table}" (symbol);'
    idx2 = f'CREATE INDEX IF NOT EXISTS {table}_date_idx   ON "{schema}"."{table}" (date);'
    with engine.begin() as conn:
        conn.execute(text(ddl)); conn.execute(text(uq)); conn.execute(text(idx1)); conn.execute(text(idx2))

# ----- Schema migration helpers -----
def _existing_columns(engine: Engine, schema: str, table: str) -> Set[str]:
    sql = """
    SELECT lower(column_name) AS column_name
    FROM information_schema.columns
    WHERE table_schema = :schema AND table_name = :table
    """
    with engine.begin() as conn:
        rows = conn.execute(text(sql), {"schema": schema, "table": table}).fetchall()
    return {r[0] for r in rows}

def _sa_type_to_sql(t) -> str:
    if isinstance(t, BigInteger): return "BIGINT"
    if isinstance(t, Float):      return "DOUBLE PRECISION"
    if isinstance(t, Text):       return "TEXT"
    if isinstance(t, DateTime):   return "TIMESTAMP"
    return "DOUBLE PRECISION"

def _infer_sql_type_from_series(s: pd.Series) -> str:
    if pd.api.types.is_datetime64_any_dtype(s): return "TIMESTAMP"
    if pd.api.types.is_integer_dtype(s):        return "BIGINT"
    if pd.api.types.is_float_dtype(s):          return "DOUBLE PRECISION"
    return "TEXT"

def ensure_missing_columns(engine: Engine, schema: str, table: str, df: pd.DataFrame,
                           dtype_map: Dict[str, object]):
    have = _existing_columns(engine, schema, table)
    missing = [c for c in df.columns if c not in have]
    if not missing: return
    alters = []
    for c in missing:
        sa_t = dtype_map.get(c)
        sql_t = _sa_type_to_sql(sa_t) if sa_t is not None else _infer_sql_type_from_series(df[c])
        alters.append(f'ADD COLUMN IF NOT EXISTS {c} {sql_t}')
    with engine.begin() as conn:
        conn.execute(text(f'ALTER TABLE "{schema}"."{table}" ' + ", ".join(alters) + ";"))

# ----- Staging + merge -----
def _to_sql_staging(engine: Engine, df: pd.DataFrame, schema: str, staging: str):
    df.to_sql(
        name=staging, con=engine, schema=schema, if_exists="replace", index=False,
        dtype={c: PG_DTYPE.get(c) for c in df.columns if c in PG_DTYPE},
        chunksize=10_000, method=None,
    )

def _merge_from_staging(engine: Engine, schema: str, table: str, staging: str, cols: Sequence[str]):
    non_key_cols = [c for c in cols if c not in ("symbol", "date")]
    set_clause = ", ".join([f"{c}=EXCLUDED.{c}" for c in non_key_cols]) if non_key_cols else ""
    sql = f"""
    INSERT INTO "{schema}"."{table}" ({", ".join(cols)})
    SELECT {", ".join(cols)} FROM "{schema}"."{staging}"
    ON CONFLICT (symbol, date) DO UPDATE SET {set_clause};
    DROP TABLE "{schema}"."{staging}";
    """
    with engine.begin() as conn:
        conn.execute(text(sql))

# ----- Public API -----
def upsert_cashflows_postgres(
    df: pd.DataFrame,
    conn_str: str = PG_CONN_STR,
    schema: str = SCHEMA,
    table: str = TABLE,
    chunk_rows: int = CHUNK_ROWS,
):
    """
    Idempotent upsert with automatic schema migration:
    - lowercase columns
    - ensure table, unique(symbol,date), indexes
    - add any missing columns to existing table
    - stage & upsert in chunks (unique staging per chunk)
    """
    if df.empty:
        print("DataFrame is empty; nothing to ingest.")
        return

    df = df.copy()
    df.columns = df.columns.str.lower()

    # Ensure expected computed columns exist even if absent
    for c in [
        "operatingcashflow_ttm", "freecashflow_ttm", "investingcashflow_ttm",
        "operatingcashflow_ttm_growth", "freecashflow_ttm_growth", "investingcashflow_ttm_growth",
        "operatingcashflow_q_yoy", "freecashflow_q_yoy", "investingcashflow_q_yoy",
    ]:
        if c not in df.columns:
            df[c] = pd.NA

    # Column order: canonical first, then extras
    cols = [c for c in COL_ORDER_LC if c in df.columns] + [c for c in df.columns if c not in COL_ORDER_LC]
    df = df[cols]

    # Dates → tz-naive
    for d in ("date", "date_start", "date_end"):
        if d in df.columns:
            df[d] = pd.to_datetime(df[d], errors="coerce").dt.tz_localize(None)

    # BIGINTs: cast pandas nullable ints to Python int/None so NULLs land correctly
    for c, typ in PG_DTYPE.items():
        if c in df.columns and isinstance(typ, BigInteger):
            s = pd.to_numeric(df[c], errors="coerce")
            df[c] = s.where(~s.isna(), None).astype("object")

    # Floats: ensure numeric
    for c, typ in PG_DTYPE.items():
        if c in df.columns and isinstance(typ, Float):
            df[c] = pd.to_numeric(df[c], errors="coerce")

    engine = _get_engine(conn_str)
    ensure_table_and_indexes(engine, schema, table)
    ensure_missing_columns(engine, schema, table, df, PG_DTYPE)

    # Chunked stage + merge (unique staging per chunk)
    n = len(df); n_chunks = math.ceil(n / chunk_rows)
    for i in range(n_chunks):
        lo, hi = i * chunk_rows, min((i + 1) * chunk_rows, n)
        chunk = df.iloc[lo:hi].copy()
        staging = f"stg_{table}_{uuid.uuid4().hex[:8]}"
        _to_sql_staging(engine, chunk, schema, staging)
        _merge_from_staging(engine, schema, table, staging, chunk.columns.tolist())
        print(f"Upserted rows {lo}–{hi} / {n}")

    print("✅ Cash flow ingestion complete.")


In [14]:
# df_cf = (your full cash-flow DataFrame after adding TTM + TTM growth + quarterly YoY)
upsert_cashflows_postgres(df_cf)


Upserted rows 0–25000 / 67423
Upserted rows 25000–50000 / 67423
Upserted rows 50000–67423 / 67423
✅ Cash flow ingestion complete.
