In [20]:
from data_vault import FUTURES
from datetime import datetime
from pathlib import Path
from tqdm.auto import tqdm
import polars as pl
import databento as db
import os
from dotenv import load_dotenv

load_dotenv() 
os.environ['REQUESTS_CA_BUNDLE'] = "/Users/U10096791/cacert.pem"
DB_KEY = os.getenv("DB_KEY")

db_path = Path("data/databento")

# Download Publisher Table From Databento

In [26]:
client = db.Historical(DB_KEY)
publishers = client.metadata.list_publishers()
publishers = pl.DataFrame(publishers)
publishers.write_parquet(db_path / "files" / "publishers.parquet")

# Convert DBN Files to Parquet


In [None]:
assets = [f for f in os.listdir(db_path / "raw") if os.path.isdir(os.path.join(db_path / "raw", f))]
pbar_assets = tqdm(assets, desc="Assets")
for asset in pbar_assets:
    asset_path = db_path / asset
    dtypes = [f for f in os.listdir(asset_path) if os.path.isdir(os.path.join(asset_path, f))]
    pbar_types = tqdm(dtypes, desc="Types")
    for dtype in pbar_types:
        type_path = asset_path / dtype
        dbn_files = [file for file in type_path.glob("*.dbn.zst")]
        pbar_files = tqdm(dbn_files, desc="Files")
        for file in pbar_files:
            data = db.DBNStore.from_file(file).to_df().reset_index()
            data = pl.from_pandas(data)

            fname = f"{file.stem.replace(".dbn","")}.parquet"

            output_dir = db_path / "files" / asset / dtype
            output_dir.mkdir(parents=True, exist_ok=True)

            output_path = output_dir / fname
            data.write_parquet(output_path, compression="zstd")

# Download Futures Data From Bloomberg

In [None]:
# Specify exchange symbols and bloomberg symbols
# Reason: Databento uses exchange symbols and I would like to stay consistent

contracts = {
   "TFM":"TZTA Comdty", 
   "NG":"NGA Comdty",
   "ECF":"MO1 Comdty",
   "BRN":"CO1 Comdty",
   "CL":"CL1 Comdty",
}

start_dt = datetime(2018,1,1)
end_dt = datetime.today()

for exch_sym, bbg_sym in tqdm(contracts.items()):
   data = FUTURES(bbg_sym)

   (data
      .fetch_chains(start_date=start_dt, end_date=end_dt)
      .fetch_meta()
      .fetch_ohlc(start_dt)
   )

   for attribute, tbl in vars(data).items():
      if attribute.startswith("_") & ("_symbols" not in attribute):

         fname = Path("data") / "bloomberg" / f"{exch_sym.lower()}{attribute}.parquet"
         tbl.write_parquet(fname)


# Create Bloomberg Meta Data File for Databento

In [2]:
contract_codes = {
    "TTF Natural Gas Base Load Mont": "TFM",       # ICE Endex Dutch TTF Natural Gas Futures Contract Symbol “TFM” :contentReference[oaicite:1]{index=1}
    "NYMEX Henry Hub Natural Gas Fu": "NG",       # CME Group / NYMEX Henry Hub Natural Gas Futures root symbol “NG” :contentReference[oaicite:4]{index=4}
    "NYMEX Light Sweet Crude Oil Fu": "CL",       # NYMEX Light Sweet Crude Oil Futures root symbol is “CL” :contentReference[oaicite:5]{index=5}
    "ICE Brent Crude Oil Future": "BRN",           # Intercontinental Exchange Brent Crude Futures root symbol “BRN” as seen in ticker “BRN00” series :contentReference[oaicite:7]{index=7}
    "Carbon Emissions Future": "C"                # ICE EUA (EU Allowance) Futures contract symbol “C” for emissions allowance futures :contentReference[oaicite:8]{index=8}
}

month_codes = {
    1: "F",  # January
    2: "G",  # February
    3: "H",  # March
    4: "J",  # April
    5: "K",  # May
    6: "M",  # June
    7: "N",  # July
    8: "Q",  # August
    9: "U",  # September
    10: "V", # October
    11: "X", # November
    12: "Z"  # December
}

bbg_meta = (
    pl.scan_parquet(Path("data") / "bloomberg" / "*_meta.parquet")
    .with_columns(
        pl.concat_str([pl.lit("00"), pl.col("fut_contract_dt").str.slice(5, 6)]).alias("expiration_year"),
        pl.col("fut_contract_dt").str.slice(0, 2).cast(pl.Int32()).cast(pl.String()).alias("expiration_month"),
    )
    .with_columns(
        pl.col("id_bb_global_ult_parent_co_name").replace(contract_codes).alias("contract_code"),
        pl.col("expiration_month").replace(month_codes).alias("month_code"),
    )
    .with_columns(
        pl.concat_str([pl.col("contract_code"), pl.lit(" FM"), pl.col("month_code"), pl.col("expiration_year"), pl.lit("!")]).alias("id_full_exchange_symbol"),
    )
    .drop(["expiration_year", "expiration_month", "contract_code", "month_code"])
    .with_columns(
        pl.concat_str([pl.lit("00"), pl.col("fut_contract_dt").str.slice(3, 6)]).cast(pl.Int32()).alias("expiration_year"),
        pl.col("fut_contract_dt").str.slice(0, 2).cast(pl.Int32()).cast(pl.Int32()).alias("expiration_month"),
    )
    .rename(
        {
            "symbol":"bbg_symbol",
            "id_full_exchange_symbol":"symbol",
        }
    )
)

bbg_meta.sink_parquet(Path("data") / "databento" / "files" / "bbg_meta.parquet")

# Create a Table for Settlement Prices Obtained From Bloomberg

In [7]:
bbg_ohlc_daily = (
    pl.scan_parquet(Path("data") / "bloomberg" / "*_ohlc.parquet")
    .rename(
        {
            "symbol":"bbg_symbol",
            "px_last":"px_settlement",
        }
    )
)

bbg_ohlc_daily.sink_parquet(Path("data") / "databento" / "files" / "bbg_ohlc_daily.parquet")