# Quantlab Data Pipeline Demo

Run the pipeline end-to-end: ingest raw data, export failures, refetch, transform, quality-check, clean invalid rows, and sample final data. Ensure `credentials.yml` is present at the repo root with Alpha Vantage and WRDS creds.


In [1]:
from datetime import date
import logging
import sys
from pathlib import Path

# Ensure repo root is on sys.path for editable/develop installs or direct use
try:
    root = Path(__file__).resolve().parents[1]
except NameError:
    root = Path.cwd().resolve().parents[0]
if str(root) not in sys.path:
    sys.path.insert(0, str(root))

from quantlab_data_pipeline import run_ingestion, transform_raw_to_final, run_quality_checks, get_final_data
from quantlab_data_pipeline.failure_utils import export_all_failures, refetch_failures, clean_final_invalid_calls
from quantlab_data_pipeline.logging_utils import configure_logging

configure_logging(level=logging.INFO)


## 1) Ingest raw data

Fetch Alpha Vantage + WRDS data into `../data/data-raw/`. Use `resume=False` to force refetch, adjust sleep/key as needed.


In [None]:
# run_ingestion(sleep_seconds=1.0, use_paid_key=True, resume=False)


## 2) Export failures (invalid API responses)

Scan raw Parquets for `Invalid API call` and write `../data/data-processed/failures_all.csv`. Run after ingestion.


In [None]:
fail_path = export_all_failures()
fail_path


## 3) Refetch failures (optional)

Use the exported CSV to refetch failed endpoints and overwrite raw Parquets, then regenerate failures if desired.


In [None]:
# refetch_failures(fail_path, use_paid_key=True, sleep_seconds=1.0)
# fail_path = export_all_failures()
# fail_path


## 4) Transform raw to final

Build cleaned final Parquets (drops null-date price rows, strips noisy `Information` column from fundamentals).


In [None]:
outputs = transform_raw_to_final()
outputs


## 5) Quality checks

Run missing-value detail and price consistency/bounds checks across all final datasets.


In [4]:
reports = run_quality_checks("all", top_missing=5)
reports


## 6) Clean remaining invalid API rows in finals

Drop any rows in final Parquets that still contain `Invalid API call`.


In [2]:
removed = clean_final_invalid_calls(dataset="all")
removed


## 7) Sample final data

Query a subset of tickers/date range to validate the outputs.


In [6]:
# Examples of pulling final datasets
# Price sample
df_price = get_final_data(tickers=["AAPL", "MSFT"], start_date=date(2024, 1, 1), end_date=date(2024, 1, 5))
df_price.head()

# Fundamentals sample (income statement)
df_is = get_final_data(dataset="fundamentals_income_statement")
df_is.head()

# Company overview sample
df_co = get_final_data(dataset="company_overview", start_date=date(2024, 1, 1), end_date=date(2024, 1, 5))
df_co.head()


## Fetch Fama-French factors (WRDS only)


In [None]:
from datetime import date
from quantlab_data_pipeline.wrds_client import fetch_ff_factors
from quantlab_data_pipeline.paths import raw_data_dir
from quantlab_data_pipeline.config_loader import load_credentials

creds = load_credentials()
ff = fetch_ff_factors(
    start=date(2000, 1, 1),
    end=date(2025, 11, 20),
    username=creds['wrds_username'],
    password=creds['wrds_password'],
)
out_path = (raw_data_dir().parent / 'data-processed' / 'FAMA_FRENCH_FACTORS.parquet')
out_path.parent.mkdir(parents=True, exist_ok=True)
ff.to_parquet(out_path, index=False)
out_path
