In [15]:
"""
ETL: Filter SEC companyfacts from 2017 to current year of SP500 companies.
--------------------------------------------------------------------------
This script uses DuckDB as a faster, im-memory query engine.
No .duckdb file is created on disk.
"""

import os
import duckdb
import pandas as pd

In [16]:
# =============== Environment Setup ===============
RAW_COMPANYFACTS_PATH = "../../../data/raw/sec_filing/companyfacts.csv"
SP500_CSV = "../../../data/raw/S&P500.csv"
MAPPING_PATH = "../../../data/raw/sec_filing/SP500_cik_mapping.csv"
OUTPUT_DIR = "../../../data/raw/fundamentals/"
FILTER_START_DATE = "2017-01-01"

os.makedirs(OUTPUT_DIR, exist_ok=True)

In [18]:
# Load and merge SP500 and CIK mapping
sp500 = pd.read_csv(SP500_CSV)
cik_mapping = pd.read_csv(MAPPING_PATH)

# Normalize column names
sp500.columns = [col.strip().lower() for col in sp500.columns]
cik_mapping.columns = [col.strip().lower() for col in cik_mapping.columns]

# Merge to add cik
sp500_merged = sp500.merge(cik_mapping, on="symbol", how="left")

matched = sp500_merged["cik"].notna().sum()
print(f"Matched {matched} out of {len(sp500_merged)} companies with CIK.")

Matched 504 out of 504 companies with CIK.


In [19]:
# =============== Connect to DuckDB ===============
con = duckdb.connect(database=':memory:')
con.execute("PRAGMA threads=8;")

con.register("sp500_df", sp500_merged)
con.execute("CREATE TEMP TABLE sp500 AS SELECT * FROM sp500_df;")

# =============== Stream-read companyfacts with DuckDB ===============
con.execute(f"""
    CREATE VIEW companyfacts AS
    SELECT * FROM read_csv_auto(
        '{RAW_COMPANYFACTS_PATH}',
        ALL_VARCHAR=TRUE,
        SAMPLE_SIZE=-1,
        AUTO_DETECT=TRUE
    );
""")

cols = con.execute("PRAGMA table_info('companyfacts');").fetchdf()
print("Columns in companyfacts:")
print(cols[["name", "type"]])

Columns in companyfacts:
           name     type
0      column00  VARCHAR
1           cik  VARCHAR
2    entityName  VARCHAR
3   companyFact  VARCHAR
4           end  VARCHAR
5           val  VARCHAR
6          accn  VARCHAR
7            fy  VARCHAR
8            fp  VARCHAR
9          form  VARCHAR
10        filed  VARCHAR
11        units  VARCHAR


In [20]:
cf_cols = [r[1].lower() for r in con.execute("PRAGMA table_info('companyfacts');").fetchall()]

# join by cik
if "cik" not in cf_cols:
    raise ValueError("CIK column not found in companyfacts data.")

join_sql = "FROM companyfacts cf JOIN sp500 s on CAST(cf.cik AS BIGINT) = CAST(s.cik AS BIGINT)"

# detect date column
date_col_candidates = ["filed", "end", "period_end", "acceptance_datetime"]
date_col = next((c for c in date_col_candidates if c in cf_cols), None)
if not date_col:
    raise ValueError("No valid date column found (expected filed/end/period_end).")

filter_sql = f"""
            SELECT *
            {join_sql}
            WHERE TRY_CAST({date_col} AS DATE) >= DATE '{FILTER_START_DATE}';
            """
print("Filter SQL preview:\n", filter_sql[:300], "...")

Filter SQL preview:
 
            SELECT *
            FROM companyfacts cf JOIN sp500 s on CAST(cf.cik AS BIGINT) = CAST(s.cik AS BIGINT)
            WHERE TRY_CAST(filed AS DATE) >= DATE '2017-01-01';
             ...


In [23]:
# =============== Export to Parquet ===============
# con.execute("CREATE TEMP VIEW filtered AS " + filter_sql)

# Add derived year for partitioning
con.execute(f"""
    CREATE TEMP VIEW filtered_with_year AS
    SELECT *,
           STRFTIME(TRY_CAST({date_col} AS DATE), '%Y') AS year
    FROM filtered;
""")

# Partition by year and symbol if available
partition_cols = ["year"]
if "symbol" in sp500_merged.columns:
    partition_cols.append("symbol")
part_expr = ", ".join(partition_cols)

# Export Parquet (no database saved)
con.execute(f"""
    COPY (SELECT * FROM filtered_with_year)
    TO '{OUTPUT_DIR}'
    (FORMAT PARQUET, PARTITION_BY ({part_expr}), COMPRESSION 'ZSTD');
""")

print(f"Parquet successfully written to {OUTPUT_DIR}")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Parquet successfully written to ../../../data/raw/fundamentals/


In [24]:
# =============== Cleanup ===============
con.close()