# Data Handler Demo

WRDS-backed S&P500 data. Run ingestion first (requires WRDS creds):

```bash
python -m src.data_pipeline.ingestion.wrds_ingestion --root . --start 2000-01-01 --end 2025-01-01 --save-raw
```

Then execute the cells below.


## Optional: Test WRDS connectivity

If you have WRDS credentials set in `config/wrds_credentials.yml` (gitignored) or `~/.pgpass`, run the next cell to verify login.

In [None]:
import sys
from pathlib import Path

# Resolve project root whether you're running from the repo root or notebooks/
cwd = Path.cwd().resolve()
if (cwd / 'config').exists():
    PROJECT_ROOT = cwd
elif cwd.name == 'notebooks' and (cwd.parent / 'config').exists():
    PROJECT_ROOT = cwd.parent
else:
    PROJECT_ROOT = cwd.parent

if str(PROJECT_ROOT) not in sys.path:
    sys.path.append(str(PROJECT_ROOT))

try:
    import wrds
except ImportError as exc:
    raise SystemExit('wrds package not installed in this env. Install via conda/pip and retry.') from exc

creds_path = PROJECT_ROOT / 'config' / 'wrds_credentials.yml'
username = password = None
if creds_path.exists():
    import yaml
    data = yaml.safe_load(creds_path.read_text()) or {}
    username, password = data.get('username'), data.get('password')

print('Connecting to WRDS...')
db = wrds.Connection(wrds_username=username, wrds_password=password)
print('Connected. Available libraries (first 10):')
print(db.list_libraries()[:10])
db.close()


## Inspect FF factors table (wrds ff_all.factors_daily)
Check available columns and a few rows to debug missing factor fields.

In [None]:
# Requires WRDS creds; reuses PROJECT_ROOT resolution from above
import pandas as pd

try:
    import wrds
except ImportError as exc:
    raise SystemExit('wrds package not installed in this env. Install and retry.') from exc

creds_path = PROJECT_ROOT / 'config' / 'wrds_credentials.yml'
username = password = None
if creds_path.exists():
    import yaml
    data = yaml.safe_load(creds_path.read_text()) or {}
    username, password = data.get('username'), data.get('password')

db = wrds.Connection(wrds_username=username, wrds_password=password)
try:
    info = db.raw_sql('select * from ff_all.fivefactors_daily limit 5')
    display(info.head())
    print('Columns:', list(info.columns))
finally:
    db.close()


In [None]:
from pathlib import Path
import sys

# Resolve project root whether launched from repo root or notebooks/
cwd = Path.cwd().resolve()
if (cwd / 'config').exists():
    PROJECT_ROOT = cwd
elif cwd.name == 'notebooks' and (cwd.parent / 'config').exists():
    PROJECT_ROOT = cwd.parent
else:
    PROJECT_ROOT = cwd.parent

if str(PROJECT_ROOT) not in sys.path:
    sys.path.append(str(PROJECT_ROOT))

from src.data_pipeline.storage import LocalParquetDataHandler

handler = LocalParquetDataHandler(data_root=PROJECT_ROOT)
handler


## Inspect data source metadata
Check `data_meta/data_sources.yml` to confirm all tables came from WRDS.


In [None]:
import yaml
from pprint import pprint

log_path = PROJECT_ROOT / 'data_meta' / 'data_sources.yml'
log = yaml.safe_load(log_path.read_text()) if log_path.exists() else None
pprint(log)


## Dataset overview
Quick view of sources, locations, and columns for each processed dataset.

In [None]:
import yaml
from pathlib import Path
import pandas as pd
try:
    from pyarrow import parquet as pq
    def _cols(path: Path):
        return pq.read_schema(path).names
except Exception:
    def _cols(path: Path):
        return list(pd.read_parquet(path, nrows=0).columns)

log_path = PROJECT_ROOT / 'data_meta' / 'data_sources.yml'
log = yaml.safe_load(log_path.read_text()) if log_path.exists() else {}
rows = []
for name, info in (log.get('datasets') or {}).items():
    if name == 'raw':
        continue
    path = Path(info.get('path', ''))
    cols = None
    if path.exists():
        try:
            cols = _cols(path)
        except Exception as exc:
            cols = [f'<failed to read cols: {exc}>']
    rows.append({
        'dataset': name,
        'source': info.get('source'),
        'path': str(path),
        'columns': cols,
    })
pd.DataFrame(rows)


In [None]:
# Universe on a specific date
handler.get_universe("2020-01-02").head()

In [None]:
# Prices subset
handler.get_prices(["AAPL", "MSFT"], start_date="2020-01-01", end_date="2020-02-01", fields=["close", "volume"]).head()

In [None]:
# Returns subset
handler.get_returns(["AAPL"], start_date="2020-01-02", end_date="2020-02-01").head()

In [None]:
# Fundamentals
handler.get_fundamentals(["AAPL"], start_date="2019-01-01", end_date="2021-12-31").head()

In [None]:
# Macro series sample
handler.get_macro(start_date="2019-01-01", end_date="2021-12-31").head()

## Test FRED API
Uses `config/fred_credentials.yml` or `FRED_API_KEY` to pull a small sample.

In [None]:
import os
import requests

api_key = None
cred_path = PROJECT_ROOT / 'config' / 'fred_credentials.yml'
if cred_path.exists():
    import yaml
    data = yaml.safe_load(cred_path.read_text()) or {}
    api_key = data.get('api_key')
api_key = api_key or os.environ.get('FRED_API_KEY')

params = {
    'series_id': 'CPIAUCSL',
    'observation_start': '2020-01-01',
    'observation_end': '2020-12-31',
    'file_type': 'json',
}
if api_key:
    params['api_key'] = api_key
resp = requests.get('https://api.stlouisfed.org/fred/series/observations', params=params, timeout=10)
resp.raise_for_status()
obs = resp.json().get('observations', [])[:5]
obs


In [None]:
# Style factor returns sample
handler.get_style_factor_returns(start_date="2019-01-01", end_date="2021-12-31").head()

In [None]:
# Benchmark returns sample
handler.get_benchmark_returns("^GSPC", start_date="2019-01-01", end_date="2021-12-31").head()

## Inspect CRSP delist returns
Check availability of `crsp.dlret` and `crsp.msedelist` (or crspsamp fallback).

In [None]:
import pandas as pd
try:
    import wrds
except ImportError as exc:
    raise SystemExit('wrds package not installed in this env. Install and retry.') from exc

creds_path = PROJECT_ROOT / 'config' / 'wrds_credentials.yml'
username = password = None
if creds_path.exists():
    import yaml
    data = yaml.safe_load(creds_path.read_text()) or {}
    username, password = data.get('username'), data.get('password')

db = wrds.Connection(wrds_username=username, wrds_password=password)
try:
    try:
        dl = db.raw_sql('select * from crspsamp_all.stkdelists limit 5')
        print('crspsamp_all.stkdelists rows:', len(dl))
        display(dl)
    except Exception as e1:
        print('crspsamp_all.stkdelists unavailable:', e1)
    try:
        dlm = db.raw_sql('select * from crsp.msedelist limit 5')
        print('crsp.msedelist rows:', len(dlm))
        display(dlm)
    except Exception as e2:
        print('crsp.msedelist unavailable:', e2)
finally:
    db.close()


## Inspect Compustat security table for IPO fields
List available columns in `comp.security` (check for IPO date equivalent).

In [None]:
import pandas as pd
try:
    import wrds
except ImportError as exc:
    raise SystemExit('wrds package not installed in this env. Install and retry.') from exc

creds_path = PROJECT_ROOT / 'config' / 'wrds_credentials.yml'
username = password = None
if creds_path.exists():
    import yaml
    data = yaml.safe_load(creds_path.read_text()) or {}
    username, password = data.get('username'), data.get('password')

db = wrds.Connection(wrds_username=username, wrds_password=password)
try:
    sample = db.raw_sql('select * from comp.security limit 5')
    print('Columns in comp.security:', list(sample.columns))
    display(sample)
except Exception as exc:
    print('comp.security unavailable:', exc)
finally:
    db.close()
