In [None]:
# Cell 1
# !pip install --upgrade polars
# !pip install pyspark
!pip install "git+https://github.com/Erew42/NLP_Thesis.git@main"
!pip install polars
!pip install pyspark

Collecting git+https://github.com/Erew42/NLP_Thesis.git@main
  Cloning https://github.com/Erew42/NLP_Thesis.git (to revision main) to /tmp/pip-req-build-4idomdpz
  Running command git clone --filter=blob:none --quiet https://github.com/Erew42/NLP_Thesis.git /tmp/pip-req-build-4idomdpz
  Resolved https://github.com/Erew42/NLP_Thesis.git to commit a16abfce335fb2bfb56e4ffdaab847b518ff7bb3
  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
Collecting polars>=1.0.0 (from thesis_pkg==0.1.0)
  Downloading polars-1.37.1-py3-none-any.whl.metadata (10 kB)
Collecting polars-runtime-32==1.37.1 (from polars>=1.0.0->thesis_pkg==0.1.0)
  Downloading polars_runtime_32-1.37.1-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.5 kB)
Downloading polars-1.37.1-py3-none-any.whl (805 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m805.7/805.7 kB[0m [3

In [None]:
# Cell 2

import pandas as pd
import numpy as np
from pathlib import Path
import polars as pl
import gc
from pyspark import sql
import datetime as dt
import os
import polars.selectors as cs


In [None]:
# Cell 3
try:
    from google.colab import drive
    drive.mount('/content/drive')
    print("Google Drive mounted successfully.")
except ImportError:
    print("Not in Colab environment. Assuming local file access.")
    # If not in Colab, BASE_DIR should point to a local path
    # For this script, we'll assume the /content/drive path works

Mounted at /content/drive
Google Drive mounted successfully.


In [None]:

# Cell 4

# --- Set Global Display Options for Polars ---

# Set the maximum number of rows to display for any DataFrame/LazyFrame print
# For example, to show a maximum of 15 rows:
pl.Config.set_tbl_rows(50)

# Optionally, you can also set the maximum number of columns if needed:
pl.Config.set_tbl_cols(50)

In [None]:
# Cell 5

# load all files as polars lazyframes and print column names
from pathlib import Path
import shutil
import polars as pl

# ---- CONFIG ----
BASE_DIR = Path('/content/drive/MyDrive/Data_LM/CRSP_Compustat_data/parquet_data/')
BASE_DIR2 = Path('/content/drive/MyDrive/Data_LM/CRSP_Compustat_data/')
TARGET_FOLDERS = [
    'documents-export-2025-3-19',
    'documents-export-2025-3-19(1)',
    'documents-export-2025-3-19(2)',
    'derived_data',
]

def schema_columns(parquet_path: Path) -> list[str]:
    # uses lazy scan -> reads only metadata to get the schema
    lf = pl.scan_parquet(parquet_path)
    return list(lf.schema.keys())
    # or shorter:
    # return list(pl.scan_parquet(parquet_path).schema.keys())



for sub in TARGET_FOLDERS:
    if sub == 'derived_data':
      BASE_DIR_temp = BASE_DIR2
    else:
      BASE_DIR_temp = BASE_DIR

    folder = BASE_DIR_temp / sub

    if not folder.exists():
        print(f"[warn] missing folder: {folder}")
        continue

    for p in folder.rglob('*.parquet'):
        # 1) Print columns
        try:
            cols = schema_columns(p)
            print(f"- {p.relative_to(BASE_DIR_temp)}")
            print(f"  columns ({len(cols)}): {cols}")
        except Exception as e:
            print(f"! {p.relative_to(BASE_DIR_temp)} -> schema read failed: {e}")


  return list(lf.schema.keys())


- documents-export-2025-3-19/indexdatadaily.parquet
  columns (14): ['KYGVKEYX', 'DATADATE', 'DVPSXD', 'NEWNUM', 'OLDNUM', 'PRCCD', 'PRCCDDIV', 'PRCHD', 'PRCLD', 'SPIHI', 'SPILO', 'SPINUMN', 'SPINUMO', 'SPIPRC']
- documents-export-2025-3-19/balancesheetquarterly.parquet
  columns (351): ['KYGVKEY', 'KEYSET', 'FYYYYQ', 'fyrq', 'lpermno', 'lpermco', 'LinkRangeTypeCd', 'ACCOQ', 'ACCOQ_DC', 'ACOMINCQ', 'ACOMINCQ_DC', 'ACOQ', 'ACOQ_DC', 'ACTQ', 'ALTOQ', 'ANCQ', 'ANOQ', 'ANOQ_DC', 'AOCIDERGLQ', 'AOCIDERGLQ_DC', 'AOCIDERGLQ_FN', 'AOCIOTHERQ', 'AOCIPENQ', 'AOCIPENQ_DC', 'AOCIPENQ_FN', 'AOCISECGLQ', 'AOCISECGLQ_DC', 'AOL2Q', 'AOL2Q_DC', 'AOQ', 'AOQ_DC', 'APQ', 'APQ_DC', 'APQ_FN1', 'AQPL1Q', 'AQPL1Q_DC', 'ATQ', 'ATQ_DC', 'ATQ_FN1', 'AUL3Q', 'AUL3Q_DC', 'BILLEXCEQ', 'CAPR1Q', 'CAPR1Q_DC', 'CAPR1Q_FN1', 'CAPR2Q', 'CAPR3Q', 'CAPR3Q_DC', 'CAPR3Q_FN1', 'CAPRTQ_FN', 'CAPSFTQ', 'CAPSFTQ_DC', 'CAPSQ', 'CAPSQ_DC', 'CAPSQ_FN1', 'CAQ', 'CAQ_DC', 'CEIEXBILLQ', 'CEIEXBILLQ_FN', 'CEQQ', 'CEQQ_DC', 'CHEQ', 'CH

In [None]:
# Cell 6

filenames = [
    'filingdates.parquet',
    'perioddescriptorquarterly.parquet',
    'perioddescriptorannual.parquet',
    'linkhistory.parquet',
    'linkfiscalperiodall.parquet',
    'companydescription.parquet',
    'securityheader.parquet',
    'securityheaderhistory.parquet',
    'balancesheetindustrialannual.parquet',
    'incomestatementindustrialannual.parquet',
    'cashflowannual.parquet',
    'companyhistory.parquet',
    'sfz_dp_dly.parquet',
    'sfz_ds_dly.parquet',
    'sfz_del.parquet',
    'sfz_dis.parquet',
    'sfz_nam.parquet',
    'sfz_hdr.parquet',
    # 'final_pit_data.parquet', # only read when it already exists (== code ran before already)
    # 'final_flagged_data.parquet', # only read when it already exists (== code ran before already)
]

BASE_DIR = Path('/content/drive/MyDrive/Data_LM/CRSP_Compustat_data/parquet_data')

BASE_DIR2 = Path('/content/drive/MyDrive/Data_LM/CRSP_Compustat_data/')
TARGET_FOLDERS = [
    'documents-export-2025-3-19',
    'documents-export-2025-3-19(1)',
    'documents-export-2025-3-19(2)',
    'derived_data',
]


# dict: { "filingdates": LazyFrame, ... }
tables: dict[str, pl.LazyFrame] = {}

for sub in TARGET_FOLDERS:
    if sub == 'derived_data':
      BASE_DIR_temp = BASE_DIR2
      # print(BASE_DIR_temp)
    else:
      BASE_DIR_temp = BASE_DIR
    folder = BASE_DIR_temp / sub
    if not folder.exists():
        print(f"[warn] missing folder: {folder}")
        continue

    for p in folder.rglob("*.parquet"):
        if sub == 'derived_data':
          print(p)
        if p.name not in filenames:
            continue

        key = p.stem  # e.g. "filingdates" from "filingdates.parquet"

        # optional: skip if we already loaded this file from another folder
        if key in tables:
            continue

        try:
            tables[key] = pl.scan_parquet(p)
            print(f"[ok] loaded {key} from {p.relative_to(BASE_DIR_temp)}")
        except Exception as e:
            print(f"[err] {p.relative_to(BASE_DIR_temp)} -> schema read failed: {e}")

[ok] loaded balancesheetindustrialannual from documents-export-2025-3-19/balancesheetindustrialannual.parquet
[ok] loaded incomestatementindustrialannual from documents-export-2025-3-19/incomestatementindustrialannual.parquet
[ok] loaded cashflowannual from documents-export-2025-3-19/cashflowannual.parquet
[ok] loaded companyhistory from documents-export-2025-3-19/companyhistory.parquet
[ok] loaded linkfiscalperiodall from documents-export-2025-3-19/linkfiscalperiodall.parquet
[ok] loaded filingdates from documents-export-2025-3-19/filingdates.parquet
[ok] loaded perioddescriptorquarterly from documents-export-2025-3-19/perioddescriptorquarterly.parquet
[ok] loaded perioddescriptorannual from documents-export-2025-3-19/perioddescriptorannual.parquet
[ok] loaded companydescription from documents-export-2025-3-19/companydescription.parquet
[ok] loaded securityheaderhistory from documents-export-2025-3-19/securityheaderhistory.parquet
[ok] loaded linkhistory from documents-export-2025-3-1

In [None]:
from thesis_pkg.pipeline import load_tables, merge_histories, build_price_panel, attach_filings, add_final_returns, attach_ccm_links
from pathlib import Path

# 1. Config
BASE_DIR = Path('/content/drive/MyDrive/Data_LM/CRSP_Compustat_data/parquet_data')
BASE_DIR2 = Path('/content/drive/MyDrive/Data_LM/CRSP_Compustat_data/')
TEMP_DIR = BASE_DIR2 / 'derived_data'

filenames = {
    'filingdates',
    'perioddescriptorquarterly',
    'perioddescriptorannual',
    'linkhistory',
    'linkfiscalperiodall',
    'companydescription',
    'securityheader',
    'securityheaderhistory',
    'balancesheetindustrialannual',
    'incomestatementindustrialannual',
    'cashflowannual',
    'companyhistory',
    'sfz_dp_dly',
    'sfz_ds_dly',
    'sfz_del',
    'sfz_dis',
    'sfz_nam',
    'sfz_hdr',
}

forms_10k_10q = [
    "10-K", "10-K/A", "10-KA",          # include both conventions
    "10-Q", "10-Q/A", "10-QA",
    "10-KT", "10-KT/A",
    "10-QT", "10-QT/A",
    "10-K405",                          # legacy
]

# 2. Load
# tables = load_tables([BASE_DIR], wanted={"sfz_dp_dly", "sfz_del", "companyhistory", ...})
tables = load_tables([BASE_DIR], wanted=filenames)

# 3. Run Pipeline
price_lf = build_price_panel(tables['sfz_ds_dly'], tables['sfz_dp_dly'], tables['sfz_del'], "1990-01-01")
price_returns_lf = add_final_returns(price_lf)
price_filings_lf = attach_filings(price_returns_lf, tables['filingdates'], forms_10k_10q) #["10-K", "10-Q", "10-KA", "10-QA"])
price_linked_lf = attach_ccm_links(price_filings_lf, tables['linkhistory'])
# 4. The "Big Merge" (RAM Safe)
final_path = merge_histories(
    price_linked_lf,
    tables['securityheaderhistory'],
    tables['companyhistory'],
    output_dir=TEMP_DIR,
    verbose = True
)

print(f"Done! Saved to {final_path}")

[merge_histories][sec_concat] cols_left=43 cols_right=43 left_only=[] right_only=[] dtype_mismatch=[]
[merge_histories][comp_concat] cols_left=49 cols_right=49 left_only=[] right_only=[] dtype_mismatch=[]
Done! Saved to /content/drive/MyDrive/Data_LM/CRSP_Compustat_data/derived_data/final_flagged_data.parquet


In [None]:
from thesis_pkg.pipeline import load_tables, merge_histories, build_price_panel, attach_filings, add_final_returns, attach_ccm_links, attach_company_description
from pathlib import Path
import polars as pl

# 1. Config
BASE_DIR = Path('/content/drive/MyDrive/Data_LM/CRSP_Compustat_data/parquet_data')
BASE_DIR2 = Path('/content/drive/MyDrive/Data_LM/CRSP_Compustat_data/')
TEMP_DIR = BASE_DIR2 / 'derived_data'

final_path = Path('/content/drive/MyDrive/Data_LM/CRSP_Compustat_data/derived_data/final_flagged_data.parquet')
final_path2 = Path('/content/drive/MyDrive/Data_LM/CRSP_Compustat_data/derived_data/final_flagged_data_compdesc_added.parquet')

filenames = {
    'companydescription'
}

# 2. Load
# tables = load_tables([BASE_DIR], wanted={"sfz_dp_dly", "sfz_del", "companyhistory", ...})
tables = load_tables([BASE_DIR], wanted=filenames)

final_lf = pl.scan_parquet(final_path)
final_lf = attach_company_description(final_lf, tables['companydescription'])
final_lf.sink_parquet(final_path2, compression='zstd')


In [None]:
from thesis_pkg.pipeline import sink_exact_firm_sample_from_parquet
from pathlib import Path
import polars as pl

BASE_DIR2 = Path('/content/drive/MyDrive/Data_LM/CRSP_Compustat_data/')
TEMP_DIR = BASE_DIR2 / 'derived_data'
sample_frac = 0.01
sample_path = Path('/content/drive/MyDrive/Data_LM/CRSP_Compustat_data/derived_data')
sample_seed = 123
final_path = Path('/content/drive/MyDrive/Data_LM/CRSP_Compustat_data/derived_data/final_flagged_data_compdesc_added.parquet')

sample_frac = 0.1
sample_seed = 42

sample_path = final_path.with_name(f"{final_path.stem}.sample_{int(sample_frac*100)}pct_seed{sample_seed}_compdesc.parquet")
firm_list_path = final_path.with_name(f"{final_path.stem}.firms_{int(sample_frac*100)}pct_seed{sample_seed}_compdesc.parquet")

out = sink_exact_firm_sample_from_parquet(
    full_source=final_path,
    sample_path=sample_path,
    firm_col="KYPERMNO",
    frac=sample_frac,
    seed=sample_seed,
    compression="zstd",
    save_firm_list_path=firm_list_path,
)

n = pl.scan_parquet(sample_path).select(pl.len()).collect().item()
n_firms = pl.scan_parquet(sample_path).select(pl.col("KYPERMNO").n_unique()).collect().item()
print({"rows": n, "unique_firms": n_firms, "path": str(out)})


{'rows': 6934534, 'unique_firms': 3041, 'path': '/content/drive/MyDrive/Data_LM/CRSP_Compustat_data/derived_data/final_flagged_data_compdesc_added.sample_10pct_seed42_compdesc.parquet'}


In [None]:
sample_df = pl.read_parquet(sample_path)
print(sample_df.head(10))

shape: (10, 53)
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬───┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐
│ KYP ┆ CAL ┆ PRC ┆ RET ┆ RET ┆ TCA ┆ VOL ┆ BID ┆ ASK ┆ BID ┆ ASK ┆ NUM ┆ OPE ┆ dat ┆ DLS ┆ DLS ┆ DLP ┆ DLA ┆ DLR ┆ DLR ┆ FIN ┆ FIN ┆ FIN ┆ SRC ┆ FIL ┆ … ┆ LIN ┆ LIN ┆ LIN ┆ LIN ┆ is_ ┆ val ┆ lin ┆ lin ┆ KYG ┆ KYG ┆ lin ┆ HIS ┆ HCH ┆ HTP ┆ HEX ┆ HIS ┆ HCH ┆ HCI ┆ HSI ┆ HNA ┆ HGS ┆ CIK ┆ n_c ┆ HCI ┆ CIK │
│ ERM ┆ DT  ┆ --- ┆ --- ┆ X   ┆ P   ┆ --- ┆ LO  ┆ HI  ┆ --- ┆ --- ┆ TRD ┆ NPR ┆ a_s ┆ TDT ┆ TCD ┆ RC  ┆ MT  ┆ ET  ┆ ETX ┆ AL_ ┆ AL_ ┆ AL_ ┆ TYP ┆ EDA ┆   ┆ KDT ┆ KEN ┆ KTY ┆ KPR ┆ can ┆ id_ ┆ k_r ┆ k_r ┆ VKE ┆ VKE ┆ k_q ┆ T_S ┆ GEN ┆ CI  ┆ CNT ┆ T_S ┆ GEN ┆ K   ┆ C   ┆ ICS ┆ UBI ┆ _de ┆ ik_ ┆ K_1 ┆ _fi │
│ NO  ┆ --- ┆ f64 ┆ f64 ┆ --- ┆ --- ┆ f64 ┆ --- ┆ --- ┆ f64 ┆ f64 

In [None]:
import polars as pl
from thesis_pkg.pipeline import DataStatus
sample_df = pl.read_parquet(sample_path)

def decode_data_status(lf: pl.LazyFrame | pl.DataFrame) -> pl.LazyFrame | pl.DataFrame:
    """
    Expands the 'data_status' bitmask column into individual boolean columns
    for quick interpretation.
    """
    # Generate a polars expression for each flag in the Enum
    # Logic: (column & flag_value) == flag_value
    flag_exprs = [
        (
            (pl.col("data_status").cast(pl.UInt64) & flag.value) == flag.value
        ).alias(f"status_{flag.name.lower()}")
        for flag in DataStatus
        if flag.value > 0  # Skip DataStatus.NONE
    ]

    return lf.with_columns(flag_exprs)

# Usage Example assuming 'df' is your current dataframe
df_decoded = decode_data_status(sample_df)
print(df_decoded.select(pl.col("^status_.*$")).head())

shape: (5, 20)
┌─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┬─────┐
│ sta ┆ sta ┆ sta ┆ sta ┆ sta ┆ sta ┆ sta ┆ sta ┆ sta ┆ sta ┆ sta ┆ sta ┆ sta ┆ sta ┆ sta ┆ sta ┆ sta ┆ sta ┆ sta ┆ sta │
│ tus ┆ tus ┆ tus ┆ tus ┆ tus ┆ tus ┆ tus ┆ tus ┆ tus ┆ tus ┆ tus ┆ tus ┆ tus ┆ tus ┆ tus ┆ tus ┆ tus ┆ tus ┆ tus ┆ tus │
│ _ha ┆ _ha ┆ _ha ┆ _ha ┆ _ha ┆ _ha ┆ _ha ┆ _co ┆ _co ┆ _co ┆ _co ┆ _co ┆ _co ┆ _se ┆ _se ┆ _se ┆ _se ┆ _se ┆ _se ┆ _ha │
│ s_r ┆ s_b ┆ s_r ┆ s_p ┆ s_d ┆ s_d ┆ s_d ┆ mph ┆ mph ┆ mph ┆ mph ┆ mph ┆ mph ┆ chi ┆ chi ┆ chi ┆ chi ┆ chi ┆ chi ┆ s_c │
│ et  ┆ idl ┆ etx ┆ rc  ┆ lre ┆ lre ┆ lpr ┆ ist ┆ ist ┆ ist ┆ ist ┆ ist ┆ ist ┆ st_ ┆ st_ ┆ st_ ┆ st_ ┆ st_ ┆ st_ ┆ omp │
│ --- ┆ o   ┆ --- ┆ --- ┆ t   ┆ tx  ┆ c   ┆ _ca ┆ _at ┆ _ma ┆ _no ┆ _va ┆ _st ┆ can ┆ att ┆ mat ┆ no_ ┆ val ┆ sta ┆ _de │
│ boo ┆ --- ┆ boo ┆ boo ┆ --- ┆ --- ┆ --- ┆ n_a ┆ tem ┆ tch ┆ _ma ┆ lid ┆ ale ┆ _at ┆ emp ┆ che ┆ mat ┆ id  ┆ le  ┆ sc  │
│ l   ┆ b

In [None]:
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
import polars as pl


# Keep in sync with your pipeline
STATUS_DTYPE = pl.UInt64

# Paste/import your DataStatus IntFlag here if you want exact ints;
# otherwise you can pass raw integers to has_flag().
# Example:
# from pipeline import DataStatus

def has_flag(status: pl.Expr, flag_int: int) -> pl.Expr:
    return (status.cast(STATUS_DTYPE) & pl.lit(flag_int, dtype=STATUS_DTYPE)) != 0


@dataclass(frozen=True)
class DiagnosticsConfig:
    required_cols: tuple[str, ...] = (
        "KYPERMNO", "CALDT", "data_status", "KYGVKEY_final"
    )
    cik_cols: tuple[str, ...] = (
        "HCIK", "HCIK_10", "CIK_desc_10", "n_cik_desc_uniq", "CIK_final"
    )


def run_pipeline_diagnostics(
    source: str | Path | pl.LazyFrame,
    *,
    cfg: DiagnosticsConfig = DiagnosticsConfig(),
    # Provide these as integers: int(DataStatus.HAS_RET), etc.
    flags: dict[str, int] | None = None,
) -> dict[str, pl.DataFrame]:
    """
    Compute diagnostic tables for a pipeline output.
    Returns a dict of DataFrames keyed by diagnostic name.
    """
    lf = source if isinstance(source, pl.LazyFrame) else pl.scan_parquet(str(source))

    schema = lf.collect_schema()
    missing_required = [c for c in cfg.required_cols if c not in schema]
    if missing_required:
        raise ValueError(f"Missing required columns: {missing_required}")

    out: dict[str, pl.DataFrame] = {}

    # ---- 1) Schema overview
    out["schema"] = pl.DataFrame(
        {"column": list(schema.keys()), "dtype": [str(schema[k]) for k in schema.keys()]}
    )

    # ---- 2) Key health: duplicates/nulls
    out["key_health"] = (
        lf.select([
            pl.len().alias("n_rows"),
            pl.col("KYPERMNO").is_null().sum().alias("n_null_permno"),
            pl.col("CALDT").is_null().sum().alias("n_null_caldt"),
            pl.col("KYGVKEY_final").is_null().sum().alias("n_null_gvkey_final"),
        ]).collect()
    )

    out["duplicate_rows_by_key"] = (
        lf.group_by(["KYPERMNO", "CALDT"])
          .agg(pl.len().alias("n"))
          .filter(pl.col("n") > 1)
          .select([
              pl.len().alias("n_duplicate_keys"),
              pl.col("n").sum().alias("n_rows_in_duplicate_keys"),
              pl.col("n").max().alias("max_dupe_multiplicity"),
          ])
          .collect()
    )

    # ---- 3) Bitmask ↔ column spot-check consistency (if flags provided)
    if flags is not None:
        status = pl.col("data_status").cast(STATUS_DTYPE)

        checks = []

        def add_flag_check(flag_name: str, col: str):
            if col not in schema:
                return
            f = flags[flag_name]
            checks.extend([
                (has_flag(status, f) & pl.col(col).is_null()).sum().alias(f"{flag_name}_set_but_{col}_null"),
                ((~has_flag(status, f)) & pl.col(col).is_not_null()).sum().alias(f"{flag_name}_notset_but_{col}_nonnull"),
            ])

        # Common ones
        for nm, col in [
            ("HAS_RET", "RET"),
            ("HAS_RETX", "RETX"),
            ("HAS_PRC", "PRC"),
            ("HAS_DLRET", "DLRET"),
            ("HAS_DLRETX", "DLRETX"),
            ("HAS_DLPRC", "DLPRC"),
        ]:
            if nm in flags:
                add_flag_check(nm, col)

        # History matched flags vs their join outputs (if present)
        if "SECHIST_MATCHED" in flags and "HIST_START_DATE_SEC" in schema:
            checks.append(
                (has_flag(status, flags["SECHIST_MATCHED"]) & pl.col("HIST_START_DATE_SEC").is_null())
                .sum().alias("SECHIST_MATCHED_set_but_HIST_START_DATE_SEC_null")
            )

        if "COMPHIST_MATCHED" in flags and "HIST_START_DATE_COMP" in schema:
            checks.append(
                (has_flag(status, flags["COMPHIST_MATCHED"]) & pl.col("HIST_START_DATE_COMP").is_null())
                .sum().alias("COMPHIST_MATCHED_set_but_HIST_START_DATE_COMP_null")
            )

        out["flag_column_consistency"] = lf.select([pl.len().alias("n_rows"), *checks]).collect()

    # ---- 4) CIK quality + coverage (only if CIK_final exists)
    if "CIK_final" in schema:
        cik = pl.col("CIK_final").cast(pl.Utf8, strict=False)
        is_10_digits = cik.str.contains(r"^\d{10}$")
        is_all_zeros = cik == "0000000000"

        out["cik_quality"] = (
            lf.select([
                pl.len().alias("n_rows"),
                cik.is_null().sum().alias("n_cik_final_null"),
                (~is_10_digits & cik.is_not_null()).sum().alias("n_cik_final_bad_format"),
                is_all_zeros.sum().alias("n_cik_final_all_zeros"),
                (is_10_digits & ~is_all_zeros).sum().alias("n_cik_final_valid_10d_nonzero"),
            ]).collect()
        )

    # ---- 5) Incremental gain: filled by desc when HCIK missing (if those columns exist)
    if "CIK_desc_10" in schema:
        hcik10 = pl.col("HCIK_10") if "HCIK_10" in schema else pl.lit(None, dtype=pl.Utf8)
        out["cik_source_gain"] = (
            lf.select([
                pl.len().alias("n_rows"),
                hcik10.is_not_null().sum().alias("n_hcik10_nonnull"),
                pl.col("CIK_desc_10").is_not_null().sum().alias("n_desc10_nonnull"),
                pl.col("CIK_final").is_not_null().sum().alias("n_cik_final_nonnull") if "CIK_final" in schema else pl.lit(None).alias("n_cik_final_nonnull"),
                (hcik10.is_null() & pl.col("CIK_desc_10").is_not_null()).sum().alias("n_filled_by_desc_when_hcik_missing"),
            ]).collect()
        )

    # ---- 6) Conflicts: HCIK_10 vs CIK_desc_10
    if "HCIK_10" in schema and "CIK_desc_10" in schema:
        both = pl.col("HCIK_10").is_not_null() & pl.col("CIK_desc_10").is_not_null()
        mismatch = both & (pl.col("HCIK_10") != pl.col("CIK_desc_10"))

        out["cik_conflicts"] = (
            lf.select([
                pl.len().alias("n_rows"),
                both.sum().alias("n_rows_both_sources"),
                mismatch.sum().alias("n_rows_mismatch"),
                (mismatch / pl.when(both.sum() > 0).then(both.sum()).otherwise(1)).alias("mismatch_rate_given_both"),
            ]).collect()
        )

        if "n_cik_desc_uniq" in schema:
            out["cik_conflicts_by_ambiguity"] = (
                lf.with_columns([
                    pl.when(pl.col("n_cik_desc_uniq").is_null()).then(pl.lit("unknown"))
                      .when(pl.col("n_cik_desc_uniq") <= 1).then(pl.lit("uniq<=1"))
                      .otherwise(pl.lit("uniq>1"))
                      .alias("desc_ambiguity_bucket"),
                    mismatch.alias("is_mismatch"),
                    both.alias("has_both"),
                ])
                .group_by("desc_ambiguity_bucket")
                .agg([
                    pl.len().alias("n_rows"),
                    pl.col("has_both").sum().alias("n_has_both"),
                    pl.col("is_mismatch").sum().alias("n_mismatch"),
                ])
                .collect()
            )

    # ---- 7) Ambiguous mappings impact (top GVKEYs)
    if "n_cik_desc_uniq" in schema:
        out["ambiguous_gvkeys_top"] = (
            lf.filter(pl.col("n_cik_desc_uniq") > 1)
              .group_by("KYGVKEY_final")
              .agg([
                  pl.len().alias("n_panel_rows"),
                  pl.max("n_cik_desc_uniq").alias("n_cik_desc_uniq"),
                  pl.col("CIK_desc_10").n_unique().alias("n_desc_10_seen_in_panel") if "CIK_desc_10" in schema else pl.lit(None).alias("n_desc_10_seen_in_panel"),
              ])
              .sort(["n_panel_rows", "n_cik_desc_uniq"], descending=True)
              .head(50)
              .collect()
        )

    return out


# Convenience: build flags dict from your IntFlag class
def build_flags_dict(DataStatus_cls) -> dict[str, int]:
    return {name: int(getattr(DataStatus_cls, name)) for name in dir(DataStatus_cls) if name.isupper()}

from pathlib import Path
import polars as pl


final_path_sample = Path('/content/drive/MyDrive/Data_LM/CRSP_Compustat_data/derived_data/final_flagged_data_compdesc_added.parquet')

# If you have DataStatus available:
# from pipeline import DataStatus
# flags = build_flags_dict(DataStatus)

# Otherwise, skip flags=None for now
diagnostics = run_pipeline_diagnostics(final_path_sample, flags=None)

diagnostics["key_health"]
diagnostics["cik_quality"]
diagnostics.get("cik_source_gain")
diagnostics.get("cik_conflicts")
diagnostics.get("ambiguous_gvkeys_top")


KYGVKEY_final,n_panel_rows,n_cik_desc_uniq,n_desc_10_seen_in_panel
i32,u32,u32,u32


In [None]:
diagnostics

{'schema': shape: (53, 2)
 ┌──────────────────────┬──────────────┐
 │ column               ┆ dtype        │
 │ ---                  ┆ ---          │
 │ str                  ┆ str          │
 ╞══════════════════════╪══════════════╡
 │ KYPERMNO             ┆ Int32        │
 │ CALDT                ┆ Date         │
 │ PRC                  ┆ Float64      │
 │ RET                  ┆ Float64      │
 │ RETX                 ┆ Float64      │
 │ TCAP                 ┆ Float64      │
 │ VOL                  ┆ Float64      │
 │ BIDLO                ┆ Float64      │
 │ ASKHI                ┆ Float64      │
 │ BID                  ┆ Float64      │
 │ ASK                  ┆ Float64      │
 │ NUMTRD               ┆ Int64        │
 │ OPENPRC              ┆ Float64      │
 │ data_status          ┆ UInt64       │
 │ DLSTDT               ┆ Date         │
 │ DLSTCD               ┆ Int32        │
 │ DLPRC                ┆ Float64      │
 │ DLAMT                ┆ Float64      │
 │ DLRET                ┆ Float

In [None]:
import polars as pl
import os
import glob
import time

# --- Configuration ---
input_folder = "/content/drive/My Drive/Data_LM/parquet_data/_year_merged/"
output_folder = "/content/drive/My Drive/Data_LM/parquet_data/"
output_filename = "filings_metadata_1993_2024_LIGHT.parquet"
output_path = os.path.join(output_folder, output_filename)

# --- 1. Verification Phase ---
print("--- Phase 1: Verifying Individual Year Files ---")

# Find all yearly parquet files
parquet_files = sorted(glob.glob(os.path.join(input_folder, "*.parquet")))
valid_files = []
total_rows_detected = 0

if not parquet_files:
    print(f"CRITICAL: No parquet files found in {input_folder}")
    print("Please check if the folder path is correct.")
else:
    print(f"{'Year':<10} | {'Rows':<10} | {'Size (MB)':<10} | {'Status'}")
    print("-" * 55)

    for f_path in parquet_files:
        filename = os.path.basename(f_path)
        file_size_mb = os.path.getsize(f_path) / (1024 * 1024)

        try:
            # Create a LazyFrame (does not read data yet)
            lf = pl.scan_parquet(f_path)

            # Robust Check: Get columns as a simple list of strings
            # This works on virtually all Polars versions
            columns = lf.columns

            # Quick row count (reads metadata only)
            row_count = lf.select(pl.len()).collect().item()

            if 'full_text' not in columns:
                status = "WARN: No Text"
                # Still add to valid if you want metadata, but flag it
                valid_files.append(f_path)
            elif 'cik' not in columns:
                status = "ERR: No CIK"
            elif row_count == 0:
                status = "EMPTY"
            else:
                status = "OK"
                valid_files.append(f_path)
                total_rows_detected += row_count

            print(f"{filename[:4]:<10} | {row_count:<10,} | {file_size_mb:<10.2f} | {status}")

        except Exception as e:
            # Catch specific errors to avoid crashing the whole loop
            print(f"{filename[:4]:<10} | {'Error':<10} | {file_size_mb:<10.2f} | ERROR: {e}")

    print("-" * 55)
    print(f"Total valid files: {len(valid_files)}")
    print(f"Total records found: {total_rows_detected:,}")


# --- 2. Creation of Light Dataset (Metadata Only) ---
print("\n--- Phase 2: Creating Light Dataset (Excluding Full Text) ---")

if not valid_files:
    print("No valid files to process. Exiting.")
else:
    start_time = time.time()
    try:
        # Lazy load all valid files
        lf_all = pl.scan_parquet(valid_files)

        # Drop 'full_text' so we never load the heavy data into RAM
        # If 'full_text' is missing in some files (warn above), select only existing cols
        available_cols = lf_all.columns
        cols_to_keep = [c for c in available_cols if c != 'full_text']

        lf_light = lf_all.select(cols_to_keep)

        # Sort for cleanliness
        if "filing_date" in cols_to_keep and "cik" in cols_to_keep:
            lf_light = lf_light.sort(["filing_date", "cik"])

        print(f"Streaming columns {cols_to_keep} to {output_filename}...")

        # Write to disk
        lf_light.sink_parquet(output_path, compression='zstd')

        end_time = time.time()
        final_size_mb = os.path.getsize(output_path) / (1024 * 1024)

        print(f"SUCCESS: Light dataset created!")
        print(f"Time: {end_time - start_time:.2f}s")
        print(f"Size: {final_size_mb:.2f} MB")

        # --- Final Sanity Check ---
        print("\n--- Final Check (First 5 rows) ---")
        print(pl.read_parquet(output_path, n_rows=5))

    except Exception as e:
        print(f"CRITICAL ERROR during final merge: {e}")

--- Phase 1: Verifying Individual Year Files ---
Year       | Rows       | Size (MB)  | Status
-------------------------------------------------------


  columns = lf.columns


1993       | 13         | 0.34       | OK
1994       | 9,752      | 306.95     | OK
1995       | 21,880     | 520.23     | OK
1996       | 44,531     | 1030.85    | OK
1997       | 55,484     | 1546.29    | OK
1998       | 55,735     | 1705.16    | OK
1999       | 56,330     | 1737.04    | OK
2000       | 59,419     | 1773.69    | OK
2001       | 56,873     | 1728.94    | OK
2002       | 54,220     | 1805.77    | OK
2003       | 50,399     | 1898.30    | OK
2004       | 49,821     | 1964.38    | OK
2005       | 50,717     | 2011.29    | OK
2006       | 49,032     | 2051.16    | OK
2007       | 47,990     | 2076.95    | OK
2008       | 47,216     | 2033.59    | OK
2009       | 42,359     | 2007.85    | OK
2010       | 40,024     | 1935.65    | OK
2011       | 40,087     | 1842.38    | OK
2012       | 37,498     | 1774.04    | OK
2013       | 34,998     | 1722.52    | OK
2014       | 33,993     | 1741.05    | OK
2015       | 32,539     | 1726.02    | OK
2016       | 30,286     | 1667.91 

  available_cols = lf_all.columns


In [None]:
import os
import re
import polars as pl
import shutil
import gc  # Garbage collection
from google.colab import drive

# --- Configuration ---
drive.mount('/content/drive', force_remount=True)
drive_zip_folder = "/content/drive/My Drive/Data_LM/Zip_Files/"
drive_parquet_folder = "/content/drive/My Drive/Data_LM/parquet_data/"

# This will be the single final file
final_output_file = "/content/drive/My Drive/Data_LM/parquet_data/all_filings_1993_2024_combined.parquet"

colab_temp_zip = "/tmp/data.zip"
colab_extract_path = "/tmp/extracted_data/"

# Batch size: Number of files to hold in RAM before writing to disk
BATCH_SIZE = 2000

# Ensure base output folder exists
os.makedirs(drive_parquet_folder, exist_ok=True)

# --- Compile Regex Patterns ---
header_search_limit = 5000
cik_pattern_header = re.compile(r"CENTRAL INDEX KEY:\s*(\d+)")
date_pattern_header = re.compile(r"FILED AS OF DATE:\s*(\d{8})")
accession_pattern_header = re.compile(r"ACCESSION NUMBER:\s*([\d-]+)")

filename_pattern = re.compile(
    r"(\d{8})_"          # 1: Date (YYYYMMDD)
    r"([^_]+)_"          # 2: Type
    r"edgar_data_"
    r"(\d+)_"            # 3: CIK
    r"([\d-]+)"          # 4: Accession Number
    r"\.txt",
    re.IGNORECASE
)

def save_batch(data, year, batch_idx, output_folder):
    """Helper to save a batch of data to parquet."""
    if not data:
        return

    parquet_filename = f"{year}_batch_{batch_idx:04d}.parquet"
    output_path = os.path.join(output_folder, parquet_filename)

    schema = {
        'filename': pl.Utf8,
        'cik': pl.Int64,
        'accession_number': pl.Utf8,
        'filing_date': pl.Utf8,
        'document_type': pl.Utf8,
        'cik_header_primary': pl.Utf8,
        'ciks_header_secondary': pl.List(pl.Utf8),
        'accession_header': pl.Utf8,
        'accession_filename': pl.Utf8,
        'filing_date_header': pl.Utf8,
        'filing_date_filename': pl.Utf8,
        'cik_conflict': pl.Boolean,
        'accession_conflict': pl.Boolean,
        'full_text': pl.Utf8
    }

    try:
        df = pl.DataFrame(data, schema=schema)
        # Convert date and save
        df = df.with_columns(
            pl.col("filing_date").str.strptime(pl.Date, "%Y%m%d", strict=False)
        )
        df.write_parquet(output_path, compression='zstd')
        print(f"  Saved batch {batch_idx} with {len(df)} records to {parquet_filename}")
    except Exception as e:
        print(f"  ERROR saving batch {batch_idx}: {e}")

# --- Processing Loop ---
years = range(1993, 2025)

for year in years:
    year_str = str(year)
    zip_filename = f"{year_str}.zip"
    source_zip_path = os.path.join(drive_zip_folder, zip_filename)

    # Create a specific folder for this year's parquets to keep things organized
    year_output_folder = os.path.join(drive_parquet_folder, year_str)
    os.makedirs(year_output_folder, exist_ok=True)

    print(f"\n--- Processing Year: {year_str} ---")

    if not os.path.exists(source_zip_path):
        print(f"SKIPPING: Zip file not found at {source_zip_path}")
        continue

    # 1. Copy and Extract
    try:
        if os.path.exists(colab_extract_path): shutil.rmtree(colab_extract_path)
        os.makedirs(colab_extract_path)

        print(f"Copying {zip_filename}...")
        shutil.copyfile(source_zip_path, colab_temp_zip)

        print(f"Extracting...")
        shutil.unpack_archive(colab_temp_zip, colab_extract_path, 'zip')

        # Remove zip immediately to free space
        os.remove(colab_temp_zip)
    except Exception as e:
        print(f"CRITICAL ERROR during setup for {year_str}: {e}")
        if os.path.exists(colab_temp_zip): os.remove(colab_temp_zip)
        continue

    # 2. Process Files
    print("Processing text files...")
    report_data = []
    batch_counter = 1

    # Walk files
    files_iterator = []
    for root, _, files in os.walk(colab_extract_path):
        for f in files:
            if f.lower().endswith(".txt"):
                files_iterator.append(os.path.join(root, f))

    total_files = len(files_iterator)
    print(f"Found {total_files} text files.")

    for i, file_path in enumerate(files_iterator):
        filename = os.path.basename(file_path)

        # --- Reset extraction variables ---
        header_ciks_str_list = []
        header_filing_date_str = None
        header_accession_str = None
        full_text = None
        filename_cik_int = None
        filename_accession_str = None
        filename_date_str = None
        filename_type_str = None
        cik_conflict_flag = False
        acc_conflict_flag = False

        # --- Filename Parsing ---
        fn_match = filename_pattern.match(filename)
        if fn_match:
            try:
                filename_date_str = fn_match.group(1)
                filename_type_str = fn_match.group(2)
                filename_cik_int = int(fn_match.group(3))
                filename_accession_str = fn_match.group(4)
            except:
                cik_conflict_flag = True
        else:
            cik_conflict_flag = True
            acc_conflict_flag = True

        # --- Content Parsing ---
        try:
            # Use errors='replace' to avoid losing data due to a few bad bytes
            with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
                full_text = f.read()

            # Header extraction
            header_search_area = full_text[:header_search_limit]

            header_ciks = cik_pattern_header.findall(header_search_area)
            header_ciks_int_set = {int(c) for c in header_ciks if c.isdigit()}

            date_match = date_pattern_header.search(header_search_area)
            if date_match: header_filing_date_str = date_match.group(1)

            accession_match = accession_pattern_header.search(header_search_area)
            if accession_match: header_accession_str = accession_match.group(1)

            # --- Validation Logic ---
            if filename_cik_int and (filename_cik_int not in header_ciks_int_set) and header_ciks_int_set:
                cik_conflict_flag = True

            if filename_accession_str and header_accession_str and (filename_accession_str != header_accession_str):
                acc_conflict_flag = True

            # Prioritize Header values, fallback to filename
            final_accession = header_accession_str or filename_accession_str
            final_date = header_filing_date_str or filename_date_str
            primary_header_cik = header_ciks[0] if header_ciks else None
            secondary_ciks = header_ciks[1:] if len(header_ciks) > 1 else []

            # Append
            report_data.append({
                'filename': filename,
                'cik': filename_cik_int,
                'accession_number': final_accession,
                'filing_date': final_date,
                'document_type': filename_type_str,
                'cik_header_primary': primary_header_cik,
                'ciks_header_secondary': secondary_ciks,
                'accession_header': header_accession_str,
                'accession_filename': filename_accession_str,
                'filing_date_header': header_filing_date_str,
                'filing_date_filename': filename_date_str,
                'cik_conflict': cik_conflict_flag,
                'accession_conflict': acc_conflict_flag,
                'full_text': full_text
            })

        except Exception as e:
            print(f"Error processing {filename}: {e}")

        # --- Batch Save Trigger ---
        if len(report_data) >= BATCH_SIZE:
            save_batch(report_data, year_str, batch_counter, year_output_folder)
            report_data = [] # Clear RAM
            batch_counter += 1
            gc.collect() # Force cleanup

    # Save remaining files in the last batch
    if report_data:
        save_batch(report_data, year_str, batch_counter, year_output_folder)

    # Cleanup extracted folder for the year
    if os.path.exists(colab_extract_path): shutil.rmtree(colab_extract_path)
    gc.collect()

# --- 3. Merge All Batches into One File ---
print("\n--- Extraction Phase Complete. Starting Final Merge ---")
print(f"Scanning for batches in {drive_parquet_folder}...")

try:
    # glob pattern to find all parquet files in year subfolders (e.g., parquet_data/2009/*.parquet)
    files_pattern = os.path.join(drive_parquet_folder, "**", "*.parquet")
    print(files_pattern)

    # Lazy load all files
    lf = pl.scan_parquet(files_pattern)

    print(f"Streaming combined data to: {final_output_file}")

    # Sink to single file using streaming (does not load everything into RAM)
    lf.sink_parquet(final_output_file, compression='zstd')

    print("SUCCESS: Single merged file created!")
    print("You may now manually delete the year folders in 'parquet_data' if you wish to save space.")

except Exception as e:
    print(f"ERROR during final merge: {e}")
    print("Note: Your individual batch files are still safe in the 'parquet_data' subfolders.")

In [None]:
import os, glob, json, time, shutil
import pyarrow as pa
import pyarrow.parquet as pq

drive_root = "/content/drive/My Drive/Data_LM/parquet_data"
drive_out_dir = f"{drive_root}/_year_merged"
local_work = "/content/_merge_work"
os.makedirs(drive_out_dir, exist_ok=True)
os.makedirs(local_work, exist_ok=True)

checkpoint_path = f"{local_work}/done_years.json"
done = set(json.load(open(checkpoint_path))) if os.path.exists(checkpoint_path) else set()

def concat_parquets_arrow(in_files, out_path, batch_size=32_000, compression="zstd", compression_level=1):
    writer = None
    try:
        for i, f in enumerate(in_files, 1):
            pf = pq.ParquetFile(f)
            for batch in pf.iter_batches(batch_size=batch_size):
                tbl = pa.Table.from_batches([batch])
                if writer is None:
                    writer = pq.ParquetWriter(
                        out_path, tbl.schema,
                        compression=compression,
                        compression_level=compression_level if compression == "zstd" else None
                    )
                writer.write_table(tbl)

            # keep the frontend alive + visible progress
            print(f"  appended {i}/{len(in_files)}: {os.path.basename(f)}", flush=True)
            time.sleep(0.2)  # tiny pause helps the Drive mount not freak out
    finally:
        if writer is not None:
            writer.close()

# years = sorted([d for d in os.listdir(drive_root) if d.isdigit() and len(d) == 4])
years = range(1993,2025)
for y_num in years:
    y = str(y_num)
    out_drive = f"{drive_out_dir}/{y}.parquet"
    if y in done and os.path.exists(out_drive):
        print(f"[skip] {y}", flush=True)
        continue

    files = sorted(glob.glob(f"{drive_root}/{y}/*.parquet"))
    if not files:
        print(f"[warn] {y}: no files", flush=True)
        continue

    tmp_local = f"{local_work}/{y}.parquet"
    if os.path.exists(tmp_local):
        os.remove(tmp_local)

    print(f"[merge] {y}: {len(files)} files", flush=True)
    concat_parquets_arrow(files, tmp_local, batch_size=32_000, compression="zstd", compression_level=1)

    shutil.move(tmp_local, out_drive)
    done.add(y)
    with open(checkpoint_path, "w") as f:
        json.dump(sorted(done), f)

    print(f"[done] {y} -> {out_drive}", flush=True)
    time.sleep(2)  # bigger pause between years (Drive cooldown)


In [None]:
import os, glob, shutil
import polars as pl

drive_root = "/content/drive/My Drive/Data_LM/parquet_data"
final_on_drive = f"{drive_root}/all_filings_1993_2024_combined.parquet"

# IMPORTANT: write locally first (Drive mount can be slow/odd), then move
tmp_local = "/content/all_filings_1993_2024_combined.parquet"

# Only scan year-subfolders, so you never re-read your already-merged output file.
files = sorted(glob.glob(os.path.join(drive_root, "[0-9][0-9][0-9][0-9]", "*.parquet")))
files = [f for f in files if os.path.abspath(f) != os.path.abspath(final_on_drive)]

# Lower memory pressure during scan: no caching, low_memory scan, no parallelism
lf = pl.scan_parquet(
    files,
    cache=False,
    low_memory=True,
    parallel="none",
)

# Sanity-check the plan (it should be essentially Scan -> Sink)
print(lf.explain())

# Bound peak memory during write by forcing smaller row groups (critical for huge text fields)
lf.sink_parquet(
    tmp_local,
    compression="zstd",
    compression_level=3,
    statistics=False,
    row_group_size=10_000,        # go smaller (e.g. 10_000) if rows contain giant strings
    # data_pagesize = 1_048_576 # 1MB (this is also the documented default)
)

shutil.move(tmp_local, final_on_drive)
print("SUCCESS")


In [None]:
lf.collect()

In [None]:
final_output_file

In [None]:
files_pattern