# MOIRAI Financial Time Series Training Prototype

This notebook implements a prototype for training the MOIRAI model on financial time series data. We'll start with univariate training on SP500 data from 2000-2010, then progress to multivariate training with OHLCV and technical indicators.

## Overview

1. Data Extraction and Preprocessing
2. Feature Engineering
3. Dataset Creation
4. Model Configuration
5. Training Loop
6. Evaluation
7. Experimentation

## Setup and Imports

In [None]:
# Standard imports
import os
import numpy as np
import pandas as pd
import polars as pl
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from datetime import datetime, timedelta
from collections.abc import Generator
from typing import Any, Dict, List, Optional, Tuple, Union

# Data handling
import datasets
from datasets import Features, Sequence, Value

# Technical indicators
import pandas_ta as ta

# Machine learning
import torch
import lightning as L
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor

# uni2ts imports
from uni2ts.model.moirai import MoiraiModule, MoiraiForecast, MoiraiFinetune
from uni2ts.distribution import StudentTOutput, NormalOutput, NegativeBinomialOutput, LogNormalOutput, MixtureOutput
from uni2ts.loss.packed import PackedNLLLoss
from uni2ts.data.loader import DataLoader, PackCollate
from uni2ts.eval_util.metrics import MedianMSE, MedianMAE
from uni2ts.eval_util.plot import plot_single

# Set plotting style
plt.style.use('seaborn-v0_8-whitegrid')
sns.set_palette('viridis')

# Set random seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)

# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

## 1. Data Extraction and Preprocessing

In [None]:
def extract_ohlcv_data(asset_class, symbol, freq, start_year, end_year, start_month=1, end_month=12):
    """
    Extract OHLCV data for a specific asset from the Parquet data lake.
    
    Parameters:
    -----------
    asset_class : str
        Asset class (crypto, fx, equity, etc.)
    symbol : str
        Symbol or ticker
    freq : str
        Frequency (1min, 15min, 1h, 4h, 1d)
    start_year, end_year : int
        Start and end years
    start_month, end_month : int
        Start and end months (default: full year)
        
    Returns:
    --------
    polars.DataFrame
        DataFrame containing the OHLCV data
    """
    # Construct the path pattern
    path_pattern = f"/home/dev/data/ohlcv/asset_class={asset_class}/freq={freq}/symbol={symbol}/year={{{start_year}..{end_year}}}/month={{{start_month}..{end_month}}}/part.parquet"
    
    # Scan the parquet files
    scan = pl.scan_parquet(
        path_pattern,
        hive_partitioning=True,
    )
    
    # Select and rename columns
    df = scan.select(
        pl.col("ts"),
        pl.col("open"),
        pl.col("high"),
        pl.col("low"),
        pl.col("close"),
        pl.col("volume"),
    ).sort("ts").collect()
    
    return df

In [None]:
# Extract SP500 data from 2000 to 2010
sp500_data = extract_ohlcv_data(
    asset_class="index", 
    symbol="SPX", 
    freq="1d", 
    start_year=2000, 
    end_year=2010
)

# Display the first few rows
sp500_data.head()

## 2. Feature Engineering

In [None]:
def add_technical_indicators(df):
    """
    Add technical indicators to OHLCV data.
    """
    # Create a copy of the DataFrame
    result_df = df.copy()
    
    # Moving Averages
    result_df['sma_5'] = ta.sma(result_df['close'], length=5)
    result_df['sma_20'] = ta.sma(result_df['close'], length=20)
    result_df['ema_5'] = ta.ema(result_df['close'], length=5)
    result_df['ema_20'] = ta.ema(result_df['close'], length=20)
    
    # MACD
    macd = ta.macd(result_df['close'])
    result_df['macd'] = macd['MACD_12_26_9']
    result_df['macd_signal'] = macd['MACDs_12_26_9']
    
    # RSI
    result_df['rsi_14'] = ta.rsi(result_df['close'], length=14)
    
    # Bollinger Bands
    bbands = ta.bbands(result_df['close'], length=20)
    result_df['bb_upper'] = bbands['BBU_20_2.0']
    result_df['bb_middle'] = bbands['BBM_20_2.0']
    result_df['bb_lower'] = bbands['BBL_20_2.0']
    
    # Drop NaN values resulting from indicators calculation
    result_df = result_df.dropna()
    
    return result_df

## 3. Dataset Creation

In [None]:
def create_univariate_dataset(df, target_col='close'):
    """
    Create a univariate dataset for the MOIRAI model.
    """
    def example_gen_func() -> Generator[dict[str, Any], None, None]:
        yield {
            "target": df[target_col].to_numpy(),  # array of shape (time,)
            "start": df.index[0],
            "freq": pd.infer_freq(df.index),
            "item_id": "SPX",
        }
    
    features = Features(
        dict(
            target=Sequence(Value("float32")),
            start=Value("timestamp[s]"),
            freq=Value("string"),
            item_id=Value("string"),
        )
    )
    
    hf_dataset = datasets.Dataset.from_generator(example_gen_func, features=features)
    return hf_dataset

In [None]:
def create_multivariate_dataset(df, target_col='close', feature_cols=None):
    """
    Create a multivariate dataset for the MOIRAI model.
    """
    if feature_cols is None:
        feature_cols = [col for col in df.columns if col != target_col]
    
    def multivar_example_gen_func() -> Generator[dict[str, Any], None, None]:
        yield {
            "target": df[[target_col]].to_numpy().T,  # array of shape (1, time)
            "feat_dynamic_real": df[feature_cols].to_numpy().T,  # array of shape (features, time)
            "start": df.index[0],
            "freq": pd.infer_freq(df.index),
            "item_id": "SPX",
        }
    
    features = Features(
        dict(
            target=Sequence(Sequence(Value("float32")), length=1),
            feat_dynamic_real=Sequence(Sequence(Value("float32")), length=len(feature_cols)),
            start=Value("timestamp[s]"),
            freq=Value("string"),
            item_id=Value("string"),
        )
    )
    
    hf_dataset = datasets.Dataset.from_generator(multivar_example_gen_func, features=features)
    return hf_dataset

## 4. Model Configuration

In [None]:
def configure_moirai_model(pretrained_model_name="Salesforce/moirai-1.1-R-base"):
    """
    Configure the MOIRAI model for financial time series forecasting.
    """
    # Load the pretrained model
    model = MoiraiModule.from_pretrained(pretrained_model_name)
    
    # Configure the model for financial time series
    model_config = {
        "distr_output": MixtureOutput(
            components=[
                StudentTOutput(),
                NormalOutput(),
                LogNormalOutput()
            ]
        ),
        "d_model": 768,
        "num_layers": 12,
        "patch_sizes": [8, 16, 32, 64],
        "max_seq_len": 512,
        "attn_dropout_p": 0.1,
        "dropout_p": 0.1,
        "scaling": True
    }
    
    return model, model_config

## 5. Training Loop

In [None]:
def train_moirai_model(model, train_dataset, val_dataset=None, epochs=10, batch_size=32, learning_rate=1e-4):
    """
    Train the MOIRAI model on financial time series data.
    """
    # Configure the training
    trainer = L.Trainer(
        max_epochs=epochs,
        accelerator="gpu" if torch.cuda.is_available() else "cpu",
        devices=1,
        precision=16 if torch.cuda.is_available() else 32,
        gradient_clip_val=1.0
    )
    
    # Train the model
    trainer.fit(model, train_dataset, val_dataset)
    
    return model

## 6. Evaluation

In [None]:
def evaluate_model(model, test_dataset):
    """
    Evaluate the MOIRAI model on test data.
    """
    # Evaluate the model
    metrics = {
        "mse": MedianMSE(),
        "mae": MedianMAE()
    }
    
    # Calculate metrics
    results = {}
    for name, metric in metrics.items():
        results[name] = metric(model, test_dataset)
    
    return results

## 7. Main Workflow

In [None]:
# Main workflow
def main():
    # 1. Extract and preprocess data
    sp500_data = extract_ohlcv_data(
        asset_class="index", 
        symbol="SPX", 
        freq="1d", 
        start_year=2000, 
        end_year=2010
    )
    
    # Convert to pandas
    sp500_df = sp500_data.to_pandas()
    sp500_df.set_index('ts', inplace=True)
    
    # 2. Add technical indicators
    sp500_df_features = add_technical_indicators(sp500_df)
    
    # 3. Create datasets
    # Univariate dataset (close price only)
    univariate_dataset = create_univariate_dataset(sp500_df_features)
    univariate_dataset.save_to_disk("sp500_univariate_dataset")
    
    # Multivariate dataset (close price + features)
    feature_cols = ['open', 'high', 'low', 'volume', 'sma_5', 'sma_20', 'rsi_14', 'macd']
    multivariate_dataset = create_multivariate_dataset(sp500_df_features, feature_cols=feature_cols)
    multivariate_dataset.save_to_disk("sp500_multivariate_dataset")
    
    # 4. Configure model
    model, model_config = configure_moirai_model()
    
    # 5. Train model
    # (Training would be done using the CLI for better configuration)
    print("Training would be done using the CLI with the following command:")
    print("python -m cli.train -cp conf/finetune run_name=sp500_univariate model=moirai_1.1_R_base data=sp500_univariate")
    
    print("\nDatasets created and saved to disk:")
    print("- sp500_univariate_dataset")
    print("- sp500_multivariate_dataset")
    
    return "Workflow completed successfully"




In [None]:
# Run the main workflow
if __name__ == "__main__":
    main()