In [1]:
import warnings
import numpy as np
import pandas as pd
from pathlib import Path
import os
import vectorbt as vbt
import io
import sys
from contextlib import redirect_stdout
from datetime import datetime
import math
import plotly.graph_objects as go
from plotly.subplots import make_subplots

import warnings
warnings.filterwarnings('ignore')





In [2]:
import pandas as pd
import numpy as np
from typing import Dict, Tuple, List, Union, Optional
from datetime import datetime
from xbbg import blp

# Define default mappings outside the class as constants
DEFAULT_OHLC_MAPPING = {
    ('I05510CA Index', 'INDEX_OAS_TSY_BP'): 'cad_oas',
    ('LF98TRUU Index', 'INDEX_OAS_TSY_BP'): 'us_hy_oas',
    ('LUACTRUU Index', 'INDEX_OAS_TSY_BP'): 'us_ig_oas',
    ('SPTSX Index', 'PX_LAST'): 'tsx',
    ('VIX Index', 'PX_LAST'): 'vix',
    ('USYC3M30 Index', 'PX_LAST'): 'us_3m_10y',
    ('BCMPUSGR Index', 'PX_LAST'): 'us_growth_surprises',
    ('BCMPUSIF Index', 'PX_LAST'): 'us_inflation_surprises',
    ('LEI YOY  Index', 'PX_LAST'): 'us_lei_yoy',
    ('.HARDATA G Index', 'PX_LAST'): 'us_hard_data_surprises',
    ('CGERGLOB Index', 'PX_LAST'): 'us_equity_revisions',
    ('.ECONREGI G Index', 'PX_LAST'): 'us_economic_regime',
}

DEFAULT_ER_YTD_MAPPING = {
    ('I05510CA Index', 'INDEX_EXCESS_RETURN_YTD'): 'cad_ig_er',
    ('LF98TRUU Index', 'INDEX_EXCESS_RETURN_YTD'): 'us_hy_er',
    ('LUACTRUU Index', 'INDEX_EXCESS_RETURN_YTD'): 'us_ig_er',
}

class DataFetcher:
    def __init__(
        self,
        start_date: str = '2002-01-01',
        end_date: str = None,
        periodicity: str = 'D',
        align_start: bool = True,
        fill: str = 'ffill',
        start_date_align: str = 'yes',
        ohlc_mapping: Dict[Tuple[str, str], str] = None,
        er_ytd_mapping: Dict[Tuple[str, str], str] = None
    ):
        """
        Initialize the DataFetcher class with all configuration parameters

        Args:
            start_date: Start date in YYYY-MM-DD format
            end_date: End date in YYYY-MM-DD format (defaults to current date)
            periodicity: Data frequency ('D' for daily)
            align_start: Whether to align data from the start date
            fill: Fill method ('ffill' for forward fill)
            start_date_align: Whether to align start dates ('yes' or 'no')
            ohlc_mapping: Custom mapping for price data (defaults to predefined mapping)
            er_ytd_mapping: Custom mapping for excess return data (defaults to predefined mapping)
        """
        # Set default end date to today if not provided
        if end_date is None:
            self.end_date = datetime.now().strftime('%Y-%m-%d')
        else:
            self.end_date = end_date

        # Store all parameters as instance attributes
        self.start_date = start_date
        self.periodicity = periodicity
        self.align_start = align_start
        self.fill = fill
        self.start_date_align = start_date_align

        # Use provided mappings or default to the ones defined outside the class
        self.ohlc_mapping = ohlc_mapping if ohlc_mapping is not None else DEFAULT_OHLC_MAPPING
        self.er_ytd_mapping = er_ytd_mapping if er_ytd_mapping is not None else DEFAULT_ER_YTD_MAPPING

        # List of problematic dates that need cleaning
        self.bad_dates = {
            '2005-11-15': {'column': 'cad_oas', 'action': 'use_previous'}
        }

    def update_parameters(self, **kwargs):
        """
        Update any of the class parameters

        Args:
            **kwargs: Any parameter to update
        """
        for key, value in kwargs.items():
            if hasattr(self, key):
                setattr(self, key, value)
            else:
                raise AttributeError(f"DataFetcher has no attribute '{key}'")

        # If end_date is updated to None, set it to current date
        if 'end_date' in kwargs and kwargs['end_date'] is None:
            self.end_date = datetime.now().strftime('%Y-%m-%d')

    def fetch_bloomberg_data(self, mapping: Optional[Dict[Tuple[str, str], str]] = None) -> pd.DataFrame:
        """
        Fetch data from Bloomberg using xbbg using class parameters

        Args:
            mapping: Optional override for the mapping to use

        Returns:
            DataFrame with requested data
        """
        # Use provided mapping or default to ohlc_mapping
        mapping_to_use = mapping if mapping is not None else self.ohlc_mapping

        securities = list(set(security for security, _ in mapping_to_use.keys()))
        fields = list(set(field for _, field in mapping_to_use.keys()))

        # Fetch data using xbbg
        df = blp.bdh(
            tickers=securities,
            flds=fields,
            start_date=self.start_date,
            end_date=self.end_date,
            Per=self.periodicity
        )

        # Create a new DataFrame with renamed columns
        renamed_df = pd.DataFrame(index=df.index)
        for (security, field), new_name in mapping_to_use.items():
            if (security, field) in df.columns:
                renamed_df[new_name] = df[(security, field)]

        return renamed_df

    def convert_er_ytd_to_index(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Convert excess return YTD data to an index, only for securities in er_ytd_mapping

        Args:
            df: DataFrame containing excess return YTD columns

        Returns:
            DataFrame with excess return columns converted to indices
        """
        result = pd.DataFrame(index=df.index)

        # Only convert columns that are in the er_ytd_mapping values
        er_columns = list(self.er_ytd_mapping.values())
        for column in df.columns:
            if column in er_columns:
                # Convert YTD returns to daily returns
                daily_returns = df[column].diff()

                # Create index starting at 100
                index_values = (1 + daily_returns / 100).cumprod() * 100
                result[f"{column}_index"] = index_values

        return result

    def merge_dfs(self, df1: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:
        """
        Merge two DataFrames with proper date alignment and filling using class parameters

        Args:
            df1: First DataFrame
            df2: Second DataFrame

        Returns:
            Merged DataFrame
        """
        # Merge DataFrames
        merged = pd.concat([df1, df2], axis=1)

        # Fill missing values
        if self.fill:
            merged = merged.fillna(method=self.fill)

        return merged

    def clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Clean specific known data issues defined in bad_dates

        Args:
            df: DataFrame to clean

        Returns:
            Cleaned DataFrame
        """
        # Make a copy to avoid modification warnings
        cleaned_df = df.copy()

        # Process each bad date according to defined actions
        for date, info in self.bad_dates.items():
            if date in cleaned_df.index and info['column'] in cleaned_df.columns:
                if info['action'] == 'use_previous':
                    prev_value = cleaned_df.loc[cleaned_df.index < date, info['column']].iloc[-1]
                    cleaned_df.loc[date, info['column']] = prev_value

        return cleaned_df

    def get_full_dataset(self) -> pd.DataFrame:
        """
        Get a complete dataset with both price data and excess return indices using class parameters.
        If start_date_align is 'yes', will find the first date where all data is available.

        Returns:
            Complete DataFrame with all requested data
        """
        # Fetch the main price data
        df_ohlc = self.fetch_bloomberg_data(mapping=self.ohlc_mapping)

        # Fetch the excess return YTD data
        er_ytd_df = self.fetch_bloomberg_data(mapping=self.er_ytd_mapping)

        # Convert excess return YTD to index (only for columns in er_ytd_mapping)
        er_index_df = self.convert_er_ytd_to_index(er_ytd_df)

        # Merge all the datasets
        final_df = self.merge_dfs(df_ohlc, er_index_df)

        # Clean any known data issues
        final_df = self.clean_data(final_df)

        # If start_date_align is 'yes', keep only rows where all data is available
        if self.start_date_align == 'yes':
            # Find the first date with no NaN values
            non_null_df = final_df.dropna(how='any')
            if not non_null_df.empty:
                first_complete_date = non_null_df.index[0]
                # Filter to only include dates on or after the first complete date
                final_df = final_df[final_df.index >= first_complete_date]

        # Apply any final fill operations specified
        if self.fill:
            final_df = final_df.fillna(method=self.fill)

        return final_df

In [3]:
# Test with a specific date range
data_fetcher = DataFetcher(
    start_date='2010-01-01',
    end_date='2015-12-31',
    start_date_align='yes'
)
date_range_df = data_fetcher.get_full_dataset()
print(date_range_df.info())
print(f"Date range: {date_range_df.index.min()} to {date_range_df.index.max()}")

<class 'pandas.core.frame.DataFrame'>
Index: 1565 entries, 2010-01-31 to 2015-12-31
Data columns (total 15 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   cad_oas                 1565 non-null   float64
 1   us_hy_oas               1565 non-null   float64
 2   us_ig_oas               1565 non-null   float64
 3   tsx                     1565 non-null   float64
 4   vix                     1565 non-null   float64
 5   us_3m_10y               1565 non-null   float64
 6   us_growth_surprises     1565 non-null   float64
 7   us_inflation_surprises  1565 non-null   float64
 8   us_lei_yoy              1565 non-null   float64
 9   us_hard_data_surprises  1565 non-null   float64
 10  us_equity_revisions     1565 non-null   float64
 11  us_economic_regime      1565 non-null   float64
 12  cad_ig_er_index         1565 non-null   float64
 13  us_hy_er_index          1565 non-null   float64
 14  us_ig_er_index          1565 n

In [4]:
# Create with default parameters then update
data_fetcher = DataFetcher()
data_fetcher.update_parameters(
    start_date='2020-01-01',
    end_date='2021-12-31',
    start_date_align='no'
)
updated_df = data_fetcher.get_full_dataset()
print(updated_df.info())

<class 'pandas.core.frame.DataFrame'>
Index: 530 entries, 2020-01-01 to 2021-12-31
Data columns (total 15 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   cad_oas                 529 non-null    float64
 1   us_hy_oas               529 non-null    float64
 2   us_ig_oas               529 non-null    float64
 3   tsx                     529 non-null    float64
 4   vix                     529 non-null    float64
 5   us_3m_10y               530 non-null    float64
 6   us_growth_surprises     530 non-null    float64
 7   us_inflation_surprises  530 non-null    float64
 8   us_lei_yoy              508 non-null    float64
 9   us_hard_data_surprises  519 non-null    float64
 10  us_equity_revisions     528 non-null    float64
 11  us_economic_regime      508 non-null    float64
 12  cad_ig_er_index         528 non-null    float64
 13  us_hy_er_index          528 non-null    float64
 14  us_ig_er_index          528 non