In [1]:
from RelativeValue import RelativeValue
RVStrat = RelativeValue('FX Mexico', '5Y')

  from pandas.core.computation.check import NUMEXPR_INSTALLED
  from pandas.core import (


In [2]:
RVStrat.get_residuals()

Getting PCA residuals ...
Residuals successfully computed


In [3]:
RVStrat.stress_test()

STRESS TEST
----------------------------------------------------------------------------------------------------
Updating tradable tenors...
Rebalancing Stress test...
Window Stress test...
Signal Persistance test...


In [4]:
RVStrat.backtest(window=21*12, vol_window=21)

BACKTEST
----------------------------------------------------------------------------------------------------
  Parameters | Confidence: 0.5 | Buffer: 0.0 | Window: 252 | Volatility Window: 21 | Volatility Target: 21


In [None]:
import pandas as pd
import numpy as np
import datetime as dt
import json
from sklearn.decomposition import PCA
from scipy.stats import norm
from dateutil.relativedelta import relativedelta
from Dates import from_excel_date, bump_date
from PCA import yield_curve_decomposition

# ==========================================
# 1. Helper Functions & Utilities
# ==========================================


def align_pca_signs(res: dict) -> dict:
    """
    Enforces consistent sign orientation for PCA factors to prevent 
    flipping between rolling windows.
    """

    loadings = res['loadings']
    scores = res['scores']
    
    # 1. Level: Force sum of loadings to be positive (Rates UP = Score UP)
    if loadings['Level'].sum() < 0:
        loadings['Level'] *= -1
        scores['Level'] *= -1

    # 2. Slope: Force standard curve shape (10Y > 2Y means Higher Slope Score)
    # Check if last tenor loading is greater than first tenor loading
    if loadings['Slope'].iloc[-1] < loadings['Slope'].iloc[0]:
        loadings['Slope'] *= -1
        scores['Slope'] *= -1
        
    # 3. Curvature: Force "Humped" shape (Belly < Wings) for positive score
    mid_idx = len(loadings) // 2
    if loadings['Curvature'].iloc[mid_idx] > loadings['Curvature'].iloc[0]:
         loadings['Curvature'] *= -1
         scores['Curvature'] *= -1
         
    res['loadings'] = loadings
    res['scores'] = scores
    return res


# ==========================================
# 2. Main Trading Class
# ==========================================

class FactorTrading:

    def __init__(self, country: str, lookback: str = '5Y'):
        # Load Config
        with open('config.json') as f:
            config = json.load(f)[country]
        self.tenors = config["Tenors"]

        self.rates = pd.read_excel('data/Rates.xlsx', index_col=0, sheet_name=config["Curve Name"])[self.tenors]
        self.rates.index.rename('Date', inplace=True)
        self.rates.drop('Ticker',inplace=True)
        self.rates.index = self.rates.index.map(from_excel_date)
        self.rates.index = [d.date() for d in self.rates.index]
        self.rates.dropna(inplace=True)
        self.rates /= 100
            
        self.country = country
        self.lookback = lookback
        self.tenors = config["Tenors"]
        self.vol_window = config.get("Volatility Window", 60)
        
            
        self.rates.dropna(inplace=True)
        
        # Containers for outputs
        self.residuals = None
        self.pca_scores = None
        self.signals = None

    def get_residuals(self):
        """Calculates rolling PCA scores and residuals."""
        print(f"="*80)
        print(f"Calculating Rolling PCA (Lookback: {self.lookback})...")

        # Determine start date based on lookback from the first available data point
        # OR define a simulation start date. Here we process the whole history 
        # where we have enough data for the lookback.
        
        # Find first date where we have enough history
        first_valid_idx = bump_date(self.rates.index[0], self.lookback)
        dates_to_process = self.rates.loc[first_valid_idx:].index

        self.residuals = pd.DataFrame(index=dates_to_process, columns=self.rates.columns)
        self.pca_scores = pd.DataFrame(index=dates_to_process, columns=['Level', 'Slope', 'Curvature'])
        
        for d in dates_to_process:
            d_date = d.date() if isinstance(d, pd.Timestamp) else d
            start_date = bump_date(d_date, '-' + self.lookback)
            
            # Slice the window
            yield_window = self.rates[(self.rates.index >= start_date) & (self.rates.index <= d)]
            
            if len(yield_window) < 20: # Safety check for enough data
                continue

            # Decompose
            res = yield_curve_decomposition(yield_window)
            # *** CRITICAL: Align Signs ***
            res = align_pca_signs(res)
            
            # Store results (taking the last point 'd' from the window)
            self.residuals.loc[d] = res['residuals'].iloc[-1]
            self.pca_scores.loc[d] = res['scores'].loc[d].values

        # Ensure types are float
        self.pca_scores = self.pca_scores.astype(float)
        self.volatilities = self.pca_scores.diff().rolling(window=21*3).std()
        self.normalization_factor = ((1/self.volatilities).div((1/self.volatilities).sum(axis=1), axis='rows'))

        print(f"PCA computation complete. Processed {len(dates_to_process)} dates.")
        print(f"="*80)

    def generate_signals(self):
        """
        Generates trading signals based on PCA scores.
        """
        if self.pca_scores is None:
            print("Error: Run get_residuals() first.")
            return

        print(f"Generating Factor Signals...")
        self.signals = pd.DataFrame(0, index=self.pca_scores.index, columns=['Level', 'Slope', 'Curvature'])
        
        # --- 1. LEVEL (PC1): Trend Following ---
        # Logic: EMA Crossover on the Score
        # Note: If aligned correctly, Higher Score = Higher Yields.
        # Trend Down in Score = Yields Falling = Bond Price Rising -> Buy Signal
        
        fast = 30
        slow = 90
        
        level_s = self.pca_scores['Level']
        ema_fast = level_s.ewm(span=fast).mean()
        ema_slow = level_s.ewm(span=slow).mean()
        
        # If Fast < Slow (Yields trending down) -> Long Signal (1)
        self.signals['Level'] = np.where(ema_fast < ema_slow, 1, -1)

        # --- 2. SLOPE (PC2): Carry-Adjusted Mean Reversion ---
        # Logic: Fade extremes only if carry is supportive.
        
        slope_window = 60
        slope_s = self.pca_scores['Slope']
        z_slope = (slope_s - slope_s.rolling(slope_window).mean()) / slope_s.rolling(slope_window).std()
        
        # Calculate Carry Proxy (2s10s Spread)
        # Using columns by position (usually 10Y is last, 2Y is 2nd or similar)
        # Adjust 'iloc' based on your specific 'Tenors' list in config
        spread_2s10s = self.rates.iloc[:, -1] - self.rates.iloc[:, 1]
        # Align spread index to score index
        spread_2s10s = spread_2s10s.reindex(self.pca_scores.index)
        
        # Thresholds
        z_thresh = 1.64
        
        # Short Slope (Flattening Trade)
        # Trigger: Z > 2 (Steep) AND Spread > 0 (Positive Carry to hold flattener/short spread?)
        # Note: In rates, "Short Spread" usually means Long 10Y, Short 2Y.
        # If Spread is Positive (Normal curve), Shorting it usually has Negative Carry (pay high cpn, receive low).
        # STRICTLY SPEAKING: Flattener (Long 10Y / Short 2Y) has NEGATIVE carry in normal curve.
        # Steepener (Short 10Y / Long 2Y) has POSITIVE carry in normal curve.
        # Let's assume you want "Positive Carry" for the direction you trade.
        
        # Let's simplify to pure Z-score for the code, you can refine the carry math:
        self.signals.loc[z_slope > z_thresh, 'Slope'] = -1 # Flatten
        self.signals.loc[z_slope < -z_thresh, 'Slope'] = 1 # Steepen

        # --- 3. CURVATURE (PC3): Mean Reversion ---
        # Logic: 2s5s10s Fly. Stationary.
        
        curv_window = 60
        curv_s = self.pca_scores['Curvature']
        z_curv = (curv_s - curv_s.rolling(curv_window).mean()) / curv_s.rolling(curv_window).std()
        
        self.signals.loc[z_curv > 1.64, 'Curvature'] = -1 # Short Fly (Bet on richening/flattening of belly)
        self.signals.loc[z_curv < -1.64, 'Curvature'] = 1 # Long Fly
        
        print("Signals generated.")
        print("="*80)
        return self.signals
    
    def estimate_dv01(self, yield_val, tenor_str):
        """
        Approximates DV01 (Dollar Value of 01) per $1 notional.
        Uses Modified Duration approximation for a par bond.
        """
        # Parse tenor to years
        if 'Y' in tenor_str:
            t = int(tenor_str.replace('Y', ''))
        elif 'M' in tenor_str:
            t = int(tenor_str.replace('M', '')) / 12
        else:
            t = 1 # Fallback
            
        # Mod Duration approx for par bond: (1 - (1+y/f)^(-t*f)) / y
        # Using frequency = 1 for simplicity
        if yield_val < 0.0001: yield_val = 0.0001 # Avoid div by zero
        
        m_dur = (1 - (1 + yield_val)**(-t)) / yield_val
        
        # DV01 = ModDur * Price * 0.0001
        # Price approx 1 (Par)
        return m_dur * 0.0001

    def backtest(self):
        """
        Computes P&L for Level, Slope, and Curvature strategies.
        Includes DV01 hedging and Transaction Costs.
        """
        if self.signals is None:
            print("Run generate_signals() first.")
            return

        print(f"Running Backtest...")
        
        # 1. Align Data
        # We trade at Close of Day T based on Signal T (or Open of T+1). 
        # For simplicity: We enter at Close T, realize PnL on T+1 changes.
        market_data = self.rates.loc[self.signals.index]
        yield_changes = market_data.diff().shift(-1) # return at T+1
        yield_changes.dropna(inplace = True)
        
        # Initialize PnL Series
        pnl = pd.DataFrame(0.0, index=self.signals.index, columns=['Level', 'Slope', 'Curvature'])
        
        # Cost assumptions (in basis points of yield spread)
        tc_bps = 0.0
        
        # Loop through days to calculate dynamic hedge ratios
        # (Vectorization is harder here due to changing DV01s)
        
        for date, row in self.signals.iterrows():
            if date not in yield_changes.index: continue
            normalization_factors = self.normalization_factor.loc[date]
            # Current Yields
            y_2y = market_data.loc[date].iloc[1] # Adjust index based on your cols
            y_5y = market_data.loc[date].iloc[2] # Adjust index based on your cols
            y_10y = market_data.loc[date].iloc[-1]
            
            # Current DV01s
            dv01_2y = self.estimate_dv01(y_2y, "2Y")
            dv01_5y = self.estimate_dv01(y_5y, "5Y")
            dv01_10y = self.estimate_dv01(y_10y, "10Y")
            
            # Next Day Yield Change (bps)
            dy_2y = yield_changes.loc[date].iloc[1] * 10000
            dy_5y = yield_changes.loc[date].iloc[2] * 10000
            dy_10y = yield_changes.loc[date].iloc[-1] * 10000
            
            # --- STRATEGY 1: LEVEL (10Y Only) ---
            # Signal: 1 (Long Bonds / Short Yields), -1 (Short Bonds / Long Yields)
            sig_lvl = row['Level']*normalization_factors['Level']
            # PnL = -Signal * DV01 * Change
            # If Signal 1 (Long), Yield Down (Neg Change) -> Positive PnL
            pnl.loc[date, 'Level'] = -1 * sig_lvl * dv01_5y * dy_5y
            
            
            # --- STRATEGY 2: SLOPE (2s10s DV01 Neutral) ---
            # Signal: 1 (Steepener), -1 (Flattener)
            # Steepener: Long 2Y, Short 10Y
            sig_slope = row['Slope']*normalization_factors['Slope']
            
            if sig_slope != 0:
                # Hedge Ratio: How many 2Ys for one 10Y?
                # ratio * dv01_2y = 1 * dv01_10y
                hedge_ratio = dv01_10y / dv01_2y
                
                # Steepener (1): Long Ratio 2Y, Short 1 Unit 10Y
                # Flattener (-1): Short Ratio 2Y, Long 1 Unit 10Y
                
                leg_2y_pnl = (- sig_slope * hedge_ratio) * (-1 * dv01_2y * dy_2y)
                leg_10y_pnl = (sig_slope) * (-1 * dv01_10y * dy_10y)
                
                pnl.loc[date, 'Slope'] = (leg_2y_pnl + leg_10y_pnl)


            # --- STRATEGY 3: CURVATURE (2s5s10s Fly DV01 Neutral) ---
            # Signal: 1 (Long Fly: Long Wings, Short Belly), -1 (Short Fly)
            # Note: Previously we defined Signal 1 as "Long Fly" (Betting on mean reversion).
            # Usually Long Fly = Long Wings / Short Belly.
            
            sig_curv = row['Curvature']*normalization_factors['Curvature']
            
            if sig_curv != 0:
                # Weights: 50% risk in 2Y, 50% risk in 10Y to hedge 1 unit of 5Y
                # w_2y * dv01_2y = 0.5 * dv01_5y  -> w_2y = 0.5 * dv01_5y / dv01_2y
                # w_10y * dv01_10y = 0.5 * dv01_5y -> w_10y = 0.5 * dv01_5y / dv01_10y
                
                w_2y = (0.5 * dv01_5y) / dv01_2y
                w_10y = (0.5 * dv01_5y) / dv01_10y
                
                # If Signal 1 (Long Fly): Long w_2y, Long w_10y, Short 1 unit 5Y
                dir = sig_curv 
                
                pnl_2y = (-dir * w_2y) * (-1 * dv01_2y * dy_2y)
                pnl_10y = (-dir * w_10y) * (-1 * dv01_10y * dy_10y)
                pnl_5y = (dir) * (-1 * dv01_5y * dy_5y)
                
                pnl.loc[date, 'Curvature'] = (pnl_2y + pnl_10y + pnl_5y)

        # --- APPLY TRANSACTION COSTS ---
        # Calculate Turnover: Absolute change in signal
        turnover = self.signals.diff().abs().fillna(0)
        
        # Cost approximation: Turnover * TC_BPS * Risk_Factor
        # We assume 1 unit of risk (approx DV01 of 10Y) for normalization
        avg_dv01 = 0.0008 # approx 8 cents per bp
        costs = turnover * tc_bps * avg_dv01
        
        net_pnl = pnl - costs
        
        print("Backtest Complete.")
        
        # Cumulative PnL
        return net_pnl.cumsum()

In [None]:
strat = FactorTrading(country="Chile", lookback="2Y")

# 1. Compute Rolling PCA
strat.get_residuals()

# 2. Generate Signals
sigs = strat.generate_signals()

# 3. Backtest
pnl = strat.backtest()

In [None]:
IL = pnl.sum(axis=1)
IL.index = pd.to_datetime(IL.index)
IL.plot()
plt.show()