# WRDS Data Download (CRSP + Compustat + CCM)

This notebook pulls WRDS datasets and saves them into a local folder under your configured `DATA_DIR`.

- Make sure `WRDS_USERNAME` is set in `.env`.
- Run cells in order from top to bottom.

In [14]:
from pathlib import Path
import sys

import numpy as np
import pandas as pd
from pandas.tseries.offsets import MonthEnd
import pyarrow as pa
import pyarrow.parquet as pq
import wrds

sys.path.insert(0, str((Path("..") / "src").resolve()))
from settings import config

data_dir = Path(config("DATA_DIR")) / "WRDS"
data_dir.mkdir(parents=True, exist_ok=True)

username = config("WRDS_USERNAME")
conn = wrds.Connection(wrds_username=username)

def write_parquet(df, path):
    """Write parquet via pyarrow to avoid pandas/pyarrow extension issues."""
    table = pa.Table.from_pandas(df, preserve_index=False)
    pq.write_table(table, path)

conn.raw_sql("select 1 as ok")

Loading library list...
Done


Unnamed: 0,ok
0,1


# CRSP monthly

In [2]:
# CRSP Block
crsp_m = conn.raw_sql("""
    select a.permno, a.permco, a.date, a.ret, a.retx, a.shrout, a.prc, a.cfacshr,
           b.shrcd, b.exchcd, b.siccd, b.ncusip,
           c.dlstcd, c.dlret
    from crsp.msf as a
    left join crsp.msenames as b
      on a.permno=b.permno and b.namedt<=a.date and a.date<=b.nameendt
    left join crsp.msedelist as c
      on a.permno=c.permno AND date_trunc('month', a.date) = date_trunc('month', c.dlstdt)
    where a.date between '01/01/1986' and '12/31/2019'
      and b.exchcd between 1 and 3
      and b.shrcd between 10 and 11
""", date_cols=["date"])

crsp_m[["permco", "permno", "shrcd", "exchcd"]] = crsp_m[["permco", "permno", "shrcd", "exchcd"]].astype(int)
crsp_m["YearMonth"] = crsp_m["date"] + MonthEnd(0)

crsp_m["dlret"] = np.where((~crsp_m["dlstcd"].isna()) & (crsp_m["dlret"].isna()), -0.5, crsp_m["dlret"])
crsp_m["dlret"] = crsp_m["dlret"].fillna(0)
crsp_m["retadj"] = (1 + crsp_m["ret"]) * (1 + crsp_m["dlret"]) - 1
crsp_m["retadj"] = np.where((crsp_m["ret"].isna()) & (crsp_m["dlret"] != 0), crsp_m["dlret"], crsp_m["ret"])

crsp_m = crsp_m.sort_values(by=["permno", "YearMonth"]).reset_index(drop=True)

write_parquet(crsp_m, data_dir / "crsp_m.parquet")

# Compustat Annual

In [3]:
comp = conn.raw_sql("""
    select a.gvkey, a.datadate, a.fyear, a.csho, a.at, a.pstkl, a.txditc,
           a.pstkrv, a.seq, a.pstk, a.ppegt, a.invt, a.lt, a.sich, a.ib, a.oancf,
           a.act, a.dlc, a.che, a.lct, a.dvc, a.epspi, a.epspx,
           a.ajex,
           a.sale, a.ao, a.prcc_f
    from comp.funda as a
    where indfmt='INDL'
      and datafmt='STD'
      and popsrc='D'
      and consol='C'
      and curcd = 'USD'
      and datadate between '01/01/1986' and '12/31/2019'
""", date_cols=["datadate"])

comp["ps"] = np.where(comp["pstkrv"].isnull(), comp["pstkl"], comp["pstkrv"])
comp["ps"] = np.where(comp["ps"].isnull(), comp["pstk"], comp["ps"])
comp["ps"] = np.where(comp["ps"].isnull(), 0, comp["ps"])

comp["txditc"] = comp["txditc"].fillna(0)
comp["be"] = comp["seq"] + comp["txditc"] - comp["ps"]

comp["act"] = comp["act"].fillna(0)
comp["dlc"] = comp["dlc"].fillna(0)
comp["che"] = comp["che"].fillna(0)
comp["lct"] = comp["lct"].fillna(0)
comp.sort_values(by=["gvkey", "datadate"], inplace=True)
comp[["act_ch", "dlc_ch", "che_ch", "lct_ch"]] = comp.groupby("gvkey")[["act", "dlc", "che", "lct"]].diff()
comp["acc"] = comp["act_ch"] + comp["dlc_ch"] - comp["che_ch"] - comp["lct_ch"]

comp["at_l1"] = comp.groupby("gvkey")["at"].shift(1)
comp["at_avg"] = comp[["at", "at_l1"]].mean(axis=1)
comp["ag"] = comp.groupby("gvkey")["at"].pct_change(fill_method=None)
comp["ppegt_diff"] = comp.groupby("gvkey")["ppegt"].diff()
comp["ao_diff"] = comp.groupby("gvkey")["ao"].diff()
comp["sale_l1"] = comp.groupby("gvkey")["sale"].shift(1)
comp["sale_l3"] = comp.groupby("gvkey")["sale"].shift(3)
comp["sale_l5"] = comp.groupby("gvkey")["sale"].shift(5)
comp["sg_1y"] = comp["sale"] / comp["sale_l1"] - 1
comp["sg_3y"] = (comp["sale"] / comp["sale_l3"]) ** (1 / 3) - 1
comp["sg_5y"] = (comp["sale"] / comp["sale_l5"]) ** (1 / 5) - 1
comp["adj_csho"] = comp["csho"] * comp["ajex"]
comp["adj_csho_l1"] = comp.groupby("gvkey")["adj_csho"].shift(1)
comp["nsi"] = np.log(comp["adj_csho"] / comp["adj_csho_l1"])

write_parquet(comp, data_dir / "compa.parquet")

  result = getattr(ufunc, method)(*inputs2, **kwargs)


In [15]:
print("Fetching Compustat Quarterly data...")
comp_q = conn.raw_sql("""
    select gvkey, datadate, fyearq, fqtr, 
           atq, ltq, niq, saleq, ceqq, 
           cshprq, epspxq, ajexq, rdq
    from comp.fundq
    where indfmt='INDL'
      and datafmt='STD'
      and popsrc='D'
      and consol='C'
      and curcdq = 'USD'
      and datadate between '01/01/1986' and '12/31/2019'
""", date_cols=["datadate", "rdq"])

# 基础处理
comp_q.sort_values(by=["gvkey", "datadate"], inplace=True)
comp_q["adj_cshprq"] = comp_q["cshprq"] * comp_q["ajexq"]
comp_q["net_margin_q"] = comp_q["niq"] / comp_q["saleq"].replace(0, np.nan)
comp_q["roe_q"] = comp_q["niq"] / comp_q["ceqq"].replace(0, np.nan)

write_parquet(comp_q, data_dir / "comp_quarterly.parquet")
print(f"Compustat Quarterly saved: {len(comp_q)} rows.")

Fetching Compustat Quarterly data...
Compustat Quarterly saved: 1314056 rows.


# CCM link

In [4]:
ccm = conn.raw_sql("""
    select gvkey, lpermno as permno, linktype, linkprim,
           linkdt, linkenddt
    from crsp.ccmxpf_linktable
    where substr(linktype, 1, 1) = 'L'
      and (linkprim = 'C' or linkprim = 'P')
""", date_cols=["linkdt", "linkenddt"])

write_parquet(ccm, data_dir / "ccm.parquet")

# Financial Ratio

In [5]:
# Manual financial ratios derived from Compustat (fallback when WRDS ratios are not accessible)
def _safe_div(numerator, denominator):
    denom = denominator.replace(0, np.nan)
    return numerator / denom

financial_ratios = comp[
    [
        "gvkey",
        "datadate",
        "fyear",
        "at",
        "lt",
        "act",
        "lct",
        "che",
        "invt",
        "sale",
        "ib",
        "be",
        "prcc_f",
        "csho",
    ]
].copy()

financial_ratios["current_ratio"] = _safe_div(financial_ratios["act"], financial_ratios["lct"])
financial_ratios["quick_ratio"] = _safe_div(
    financial_ratios["act"] - financial_ratios["invt"],
    financial_ratios["lct"],
)
financial_ratios["cash_ratio"] = _safe_div(financial_ratios["che"], financial_ratios["lct"])
financial_ratios["invt_turn"] = _safe_div(financial_ratios["sale"], financial_ratios["invt"])
financial_ratios["asset_turn"] = _safe_div(financial_ratios["sale"], financial_ratios["at"])
financial_ratios["prof_margin"] = _safe_div(financial_ratios["ib"], financial_ratios["sale"])
financial_ratios["roe"] = _safe_div(financial_ratios["ib"], financial_ratios["be"])
financial_ratios["roa"] = _safe_div(financial_ratios["ib"], financial_ratios["at"])
financial_ratios["de_ratio"] = _safe_div(financial_ratios["lt"], financial_ratios["be"])
financial_ratios["debt_assets"] = _safe_div(financial_ratios["lt"], financial_ratios["at"])

financial_ratios["mktcap"] = financial_ratios["prcc_f"] * financial_ratios["csho"]
financial_ratios["bm"] = _safe_div(financial_ratios["be"], financial_ratios["mktcap"])

write_parquet(financial_ratios, data_dir / "financial_ratio.parquet")

# IBES unadjusted summary

In [9]:
from datetime import datetime
import pandas as pd

# Configuration
start_date = "1986-01-01"
end_date = "2019-12-31"

# Table candidates: 
# 'statsumu' is the Unadjusted table (preferred for the paper).
# 'statsum' is the Adjusted table (backup).
summary_candidates = [
    ("ibes.statsumu_epsus", "statpers"),
    ("ibes.statsum_epsus", "statpers"),
]

ibes_summary = None
summary_exc = None

for table, date_col in summary_candidates:
    try:
        print(f"Attempting to fetch data from {table}...")
        
        # SQL logic:
        # 1. measure='EPS': Filters for Earnings Per Share only.
        # 2. fpi='6': Filters for the 'Current Fiscal Quarter' forecast. 
        #    This is crucial to avoid duplicate rows for different forecast horizons.
        # 3. {date_col}: Filters by the statistical period date.
        sql = f"""
            SELECT * FROM {table} 
            WHERE measure = 'EPS' 
              AND fpi = '6' 
              AND {date_col} BETWEEN '{start_date}' AND '{end_date}'
        """
        
        # Execute query
        ibes_summary = conn.raw_sql(sql, date_cols=[date_col])
        
        if ibes_summary is not None and not ibes_summary.empty:
            print(f"Success! Retrieved {len(ibes_summary)} rows from {table}.")
            break
            
    except Exception as exc:
        print(f"Failed to query {table}. Error: {exc}")
        summary_exc = exc
        # We skip the "select * without date" fallback to prevent Out-of-Memory (OOM) errors
        continue

# Final Check and Save
if ibes_summary is None or ibes_summary.empty:
    print(
        "Error: IBES summary tables are not accessible or returned no rows under the specified filters."
    )
else:
    # Save the result to a Parquet file
    output_file = data_dir / "Forecast_EPS_summary_unadjusted_1986_2019.parquet"
    write_parquet(ibes_summary, output_file)
    print(f"Data successfully saved to {output_file}")

    # Data Validation: Ensure uniqueness
    # In a clean IBES summary pull, (ticker + statpers) should be a unique key when fpi is fixed.
    max_dups = ibes_summary.groupby(['ticker', 'statpers']).size().max()
    if max_dups > 1:
        print(f"Warning: Detected {max_dups} duplicate entries for the same ticker/date. Check FPI settings.")
    else:
        print("Validation Passed: Data is unique per ticker and statistical period.")

Attempting to fetch data from ibes.statsumu_epsus...
Success! Retrieved 1513539 rows from ibes.statsumu_epsus.
Data successfully saved to /Users/yandong/Documents/GitHub/p02_van_binsbergen_han_lopez-lira_2022/p02_van_binsbergen_han_lopez_lira_2022/_data/WRDS/Forecast_EPS_summary_unadjusted_1986_2019.parquet
Validation Passed: Data is unique per ticker and statistical period.


# IBES unadjusted actual

In [17]:
def _get_table_columns(table):
    schema, tbl = table.split(".")
    cols_df = conn.raw_sql(
        f"""
        select column_name
        from information_schema.columns
        where table_schema = '{schema}' and table_name = '{tbl}'
        order by ordinal_position
        """
    )
    return [c.lower() for c in cols_df["column_name"].tolist()]

def _build_ibes_query(table, columns):
    select_cols = "*"
    filters = []
    if "measure" in columns:
        filters.append("measure='EPS'")
    if "pdicity" in columns:
        filters.append("pdicity in ('ANN','QTR')")
    date_col = None
    for candidate in ["anndats", "actdats", "statpers", "fpedats"]:
        if candidate in columns:
            date_col = candidate
            break
    if date_col:
        filters.append(f"{date_col} between '{start_date}' and '{end_date}'")
    where_clause = ""
    if filters:
        where_clause = "where " + " and ".join(filters)
    sql = f"""
        select {select_cols}
        from {table}
        {where_clause}
    """
    return sql, [date_col] if date_col else []

summary_actual_candidates = [
    "ibes.actu_epsus",
    "ibes.act_epsus",
]

ibes_summary_actual = None
summary_actual_exc = None
for table in summary_actual_candidates:
    try:
        cols = _get_table_columns(table)
        sql, date_cols = _build_ibes_query(table, cols)
        ibes_summary_actual = conn.raw_sql(sql, date_cols=date_cols)
        break
    except Exception as exc:
        summary_actual_exc = exc

if ibes_summary_actual is None:
    try:
        available_tables = conn.raw_sql(
            """
            select table_name
            from information_schema.tables
            where table_schema = 'ibes'
            order by table_name
            """
        )
        tables_list = ", ".join(available_tables["table_name"].tolist())
    except Exception:
        tables_list = "<not visible>"
    print(
        "IBES summary actual tables are not accessible in this account. "
        "Skipping IBES summary actual. Available tables: ",
        tables_list,
    )
else:
    write_parquet(ibes_summary_actual, data_dir / "Actual_EPS_summary_unadjusted_1986_2019.parquet")

# CRSP-IBES link table

In [11]:
def _find_first(columns, candidates):
    for name in candidates:
        if name in columns:
            return name
    return None

def _load_iclink_from_table(table):
    schema, tbl = table.split(".")
    cols_df = conn.raw_sql(
        f"""
        select column_name
        from information_schema.columns
        where table_schema = '{schema}' and table_name = '{tbl}'
        order by ordinal_position
        """
    )
    columns = [c.lower() for c in cols_df["column_name"].tolist()]

    permno_col = _find_first(columns, ["permno", "lpermno", "permno_crsp", "permno_crspn"])
    if permno_col is None:
        return None

    ticker_col = _find_first(columns, ["ticker", "ibtic", "tic", "ibes_ticker"])
    sdate_col = _find_first(
        columns,
        ["sdate", "startdate", "linkdt", "begdate", "start_date", "link_start"],
    )
    edate_col = _find_first(
        columns,
        ["edate", "enddate", "linkenddt", "end_date", "link_end"],
    )
    score_col = _find_first(columns, ["score", "linkscore", "quality"])

    select_cols = [f"{permno_col} as permno"]
    if ticker_col:
        select_cols.append(f"{ticker_col} as ticker")
    else:
        select_cols.append("NULL::text as ticker")
    if sdate_col:
        select_cols.append(f"{sdate_col} as sdate")
    else:
        select_cols.append("NULL::date as sdate")
    if edate_col:
        select_cols.append(f"{edate_col} as edate")
    else:
        select_cols.append("NULL::date as edate")
    if score_col:
        select_cols.append(f"{score_col} as score")
    else:
        select_cols.append("NULL::numeric as score")

    where_clause = ""
    if score_col:
        where_clause = "where score <= 1"

    sql = f"""
        select {', '.join(select_cols)}
        from {table}
        {where_clause}
    """
    return conn.raw_sql(sql, date_cols=["sdate", "edate"])

iclink_candidates = [
    "wrdsapps.id_ibes",
    "wrdsapps.id_ibes_ccm",
    "wrdsapps.ibcrsphist",
    "wrdsapps.opcrsphist",
    "wrdsapps.firm_ratio_ibes",
    "wrdsapps.firm_ratio_ibes_ccm",
]

iclink = None
last_exc = None
for table in iclink_candidates:
    try:
        iclink = _load_iclink_from_table(table)
        if iclink is not None and not iclink.empty:
            break
    except Exception as exc:
        last_exc = exc

if iclink is None:
    print(
        "No IBES-CRSP link table with permno found in wrdsapps. "
        "Skipping ICLINK.",
    )
else:
    crsp_ibes = crsp_m.merge(iclink, on="permno", how="left")
    if "sdate" in crsp_ibes.columns and "edate" in crsp_ibes.columns:
        crsp_ibes = crsp_ibes[
            (crsp_ibes["YearMonth"] >= crsp_ibes["sdate"])
            & (crsp_ibes["YearMonth"] <= crsp_ibes["edate"])
        ].copy()

    write_parquet(crsp_ibes, data_dir / "crsp_ibes_linked.parquet")

In [12]:
conn.close()