# Data Catalog Tutorial

Learn how the `DataCatalog` in `trading_transformers` stages market data, why it can be preferable to bare `pandas` calls, and how derived indicators feed downstream models.


## 1. Setup

We will create a small synthetic OHLCV dataset and stage it through the catalog. Run the cells sequentially inside this notebook.


In [1]:
from pathlib import Path
import json
import logging

import numpy as np
import pandas as pd

from trading_transformers.data import DataCatalog, DataSource
from trading_transformers.features import ContinuousFeatureBuilder
from trading_transformers.logging import configure_logging

TUTORIAL_ROOT = Path('notebooks/_tmp/catalog_demo')
DATA_DIR = TUTORIAL_ROOT / 'raw'
CATALOG_PATH = TUTORIAL_ROOT / 'catalog.json'

DATA_DIR.mkdir(parents=True, exist_ok=True)

configure_logging(level=logging.DEBUG)


## 2. Create a Sample CSV

In real projects the CSV would come from your data lake. Here we synthesise a short price series for ETH/USDT and write it to disk.


In [2]:
np.random.seed(7)
rows = 200
base_price = 1800.0
changes = np.random.normal(0, 5, size=rows)
prices = base_price + np.cumsum(changes)

frame = pd.DataFrame({
    'timestamp': pd.date_range('2024-01-01', periods=rows, freq='T'),
    'open': prices,
    'high': prices + np.random.uniform(0.5, 2.0, size=rows),
    'low': prices - np.random.uniform(0.5, 2.0, size=rows),
    'close': prices + np.random.uniform(-1.0, 1.0, size=rows),
    'volume': np.random.uniform(10, 50, size=rows)
})

CSV_PATH = DATA_DIR / 'ethusdt_1min.csv'
frame.to_csv(CSV_PATH, index=False)
CSV_PATH


  'timestamp': pd.date_range('2024-01-01', periods=rows, freq='T'),


PosixPath('notebooks/_tmp/catalog_demo/raw/ethusdt_1min.csv')

## 3. Plain `pandas` Loading

A direct `pd.read_csv` works, but every script must repeat the same path, format arguments, and context.


In [3]:
pd.read_csv(CSV_PATH, parse_dates=['timestamp']).head()


Unnamed: 0,timestamp,open,high,low,close,volume
0,2024-01-01 00:00:00,1808.452629,1809.85002,1807.173035,1808.272629,17.227607
1,2024-01-01 00:01:00,1806.122942,1807.128493,1804.833548,1806.269206,27.385978
2,2024-01-01 00:02:00,1806.287042,1808.264872,1805.236009,1805.781431,35.763478
3,2024-01-01 00:03:00,1808.324624,1808.998195,1807.092296,1809.09509,48.403755
4,2024-01-01 00:04:00,1804.380009,1804.958911,1802.401734,1804.773797,20.776147


## 4. Register the Dataset with `DataCatalog`

The catalog records:
- a **logical name** (`ethusdt_1min`),
- the **on-disk location**,
- the **format** (CSV/Parquet),
- optional metadata (frequency, tags).

Once registered, other code loads by name instead of juggling file paths.


In [4]:
catalog = DataCatalog(root=TUTORIAL_ROOT)
catalog.register_source(DataSource(
    name='ethusdt_1min',
    path=CSV_PATH,
    fmt='csv',
    frequency='1min',
    metadata={'asset': 'ETH/USDT'}
))

print('Sources:', catalog.list_sources())
print('Catalog root:', catalog.root)
print('Cache dir:', catalog.cache_dir)


Sources: ['ethusdt_1min']
Catalog root: notebooks/_tmp/catalog_demo
Cache dir: notebooks/_tmp/catalog_demo/.cache


### Why Prefer a Catalog?

- **Single source of truth:** scripts, CLIs, and notebooks resolve dataset names consistently.
- **Reproducibility:** the catalog can be committed to git, so experiments know exactly which files were used.
- **Extensibility:** archives, checksums, and metadata live beside the registration.


### Persisting Catalog State

The in-memory registry can be saved to JSON (`catalog.to_json`). This file holds the registered sources and archive definitions.


In [5]:
catalog.to_json(CATALOG_PATH)
print(CATALOG_PATH)
print(CATALOG_PATH.read_text())


notebooks/_tmp/catalog_demo/catalog.json
{
  "root": "notebooks/_tmp/catalog_demo",
  "sources": [
    {
      "name": "ethusdt_1min",
      "path": "notebooks/_tmp/catalog_demo/raw/ethusdt_1min.csv",
      "fmt": "csv",
      "frequency": "1min",
      "metadata": {
        "asset": "ETH/USDT"
      }
    }
  ],
  "archives": []
}


### Rehydrating a Catalog Elsewhere

Any process can reconstruct the same catalog by loading the JSON file. Notice that relative paths are stored exactly as registered; keep the catalog near your data until rebasing support is added.


In [6]:
loaded_catalog = DataCatalog.from_json(CATALOG_PATH)
loaded_frame = loaded_catalog.load('ethusdt_1min', parse_dates=['timestamp'])
loaded_frame.head()


Unnamed: 0,timestamp,open,high,low,close,volume
0,2024-01-01 00:00:00,1808.452629,1809.85002,1807.173035,1808.272629,17.227607
1,2024-01-01 00:01:00,1806.122942,1807.128493,1804.833548,1806.269206,27.385978
2,2024-01-01 00:02:00,1806.287042,1808.264872,1805.236009,1805.781431,35.763478
3,2024-01-01 00:03:00,1808.324624,1808.998195,1807.092296,1809.09509,48.403755
4,2024-01-01 00:04:00,1804.380009,1804.958911,1802.401734,1804.773797,20.776147


## 5. Adding Indicators

The catalog keeps raw datasets. Feature engineering happens downstream so different models can choose their own transformations. For this example we will:

1. Load the frame from the catalog.
2. Apply `ContinuousFeatureBuilder` to compute price-based indicators (returns, spreads, calendar features).
3. Add a simple rolling mean indicator manually.


In [7]:
builder = ContinuousFeatureBuilder()
feature_frame = builder.transform(loaded_frame)
feature_frame['sma_15'] = feature_frame['close'].rolling(15).mean()
feature_frame[['timestamp', 'close', 'log_return', 'hl_range_pct', 'close_location', 'sma_15']].head(10)


2025-09-26 20:54:45 DEBUG [trading_transformers] Adding return features
2025-09-26 20:54:45 DEBUG [trading_transformers] Adding range features
2025-09-26 20:54:45 DEBUG [trading_transformers] Adding volume features
2025-09-26 20:54:45 DEBUG [trading_transformers] Adding volatility features
2025-09-26 20:54:45 DEBUG [trading_transformers] Adding calendar features


Unnamed: 0,timestamp,close,log_return,hl_range_pct,close_location,sma_15
0,2024-01-01 00:00:00,1808.272629,,,0.410758,
1,2024-01-01 00:01:00,1806.269206,-0.001109,0.001269,0.625574,
2,2024-01-01 00:02:00,1805.781431,-0.00027,0.001677,0.180075,
3,2024-01-01 00:03:00,1809.09509,0.001833,0.001055,1.050839,
4,2024-01-01 00:04:00,1804.773797,-0.002392,0.001414,0.92761,
5,2024-01-01 00:05:00,1803.713993,-0.000587,0.001815,0.305393,
6,2024-01-01 00:06:00,1803.691084,-1.3e-05,0.001551,0.374125,
7,2024-01-01 00:07:00,1794.73613,-0.004977,0.000958,-0.10966,
8,2024-01-01 00:08:00,1800.84355,0.003397,0.001711,0.455654,
9,2024-01-01 00:09:00,1803.07758,0.00124,0.001134,0.208145,


### Optional Normalisation

`ContinuousFeatureBuilder` leaves indicators in their native scales. When you need rolling z-scores (for example, to compare regime strength across assets), call `ContinuousFeatureBuilder.normalize` and pass the columns you want to scale.


In [8]:
normalized = ContinuousFeatureBuilder.normalize(
    feature_frame,
    columns=['close_return', 'volume_change'],
    window=32,
    suffix='_z'
)
normalized[['close_return', 'close_return_z', 'volume_change', 'volume_change_z']].head(10)


2025-09-26 20:54:45 DEBUG [trading_transformers] Normalizing column close_return with window 32
2025-09-26 20:54:45 DEBUG [trading_transformers] Normalizing column volume_change with window 32


Unnamed: 0,close_return,close_return_z,volume_change,volume_change_z
0,,,,
1,-2.003422,,10.158371,
2,-0.487775,,8.3775,
3,3.313658,,12.640277,
4,-4.321293,,-27.627608,
5,-1.059804,,4.938414,
6,-0.022909,,7.05753,
7,-8.954954,,-17.09619,
8,6.107421,,18.787417,
9,2.23403,,14.514971,


**Storage note:** derived indicators are not written back into the catalog by default. They live in working data frames, feature stores, or parquet exports managed by your pipeline. This keeps the catalog focused on source-of-truth files while engineered views remain versioned separately if needed.


## 6. Simple Regression Example

To keep the focus on data handling, we fit a minimal linear regression (via `numpy.linalg.lstsq`) that predicts the next-minute return using the engineered features. Any downstream model—Transformers, Chronos, LSTMs—can reuse the same windowed data.


In [9]:
target = feature_frame['log_return'].shift(-1).dropna()
features = feature_frame.loc[target.index, ['log_return', 'hl_range_pct', 'close_location', 'sma_15']].fillna(method='bfill')

X = np.column_stack([np.ones(len(features)), features.to_numpy()])
y = target.to_numpy()

coef, *_ = np.linalg.lstsq(X, y, rcond=None)
print('Intercept:', coef[0])
print('Coefficients:', dict(zip(features.columns, coef[1:])))

pred = X @ coef
mae = np.mean(np.abs(pred - y))
print('MAE:', mae)


Intercept: 0.04787527849829896
Coefficients: {'log_return': np.float64(-0.06955782255633337), 'hl_range_pct': np.float64(-0.1678304129757145), 'close_location': np.float64(-0.0009579736099580634), 'sma_15': np.float64(-2.6420720905544744e-05)}
MAE: 0.002225195352537284


  features = feature_frame.loc[target.index, ['log_return', 'hl_range_pct', 'close_location', 'sma_15']].fillna(method='bfill')


## 7. Summary

- Catalogs replace hard-coded file paths with named entries, promoting reproducibility and discoverability.
- The catalog state is stored in JSON (`catalog.json`) under the chosen root (`TUTORIAL_ROOT`).
- Indicators live outside the catalog; compute them after loading so that multiple model families can apply their own transformations.
- Apply `ContinuousFeatureBuilder.normalize` only when you need rolling z-scores—raw indicators remain price-derived by default.
- Once data is staged, any modelling approach can consume the engineered features—in this tutorial we used a plain linear regression for illustration.

Next steps: integrate catalog registration into preprocessing scripts, and layer window generation (`WindowGenerator`) when preparing sequences for sequence models.
