In [21]:
import numpy as np
import pandas as pd


def generate_single_ar3_series(n=10000, var=0.000211, seed=None):
    """
    Generates a time series using a single AR(3) process and calculates drawdown metrics.

    Args:
        n (int): Number of observations to generate.
        var (float): Variance of the white noise (at).
        seed (int): Random seed for reproducibility.

    Returns:
        pd.DataFrame: A DataFrame with columns 'ar3', 'duration', and 'magnitude'.
    """
    if seed is not None:
        np.random.seed(seed)

    # AR(3) coefficients
    f0, f1, f2, f3 = 0.25, 0.2, 0.35, -0.1
    noise = np.random.normal(0, np.sqrt(var), n)

    # Initialize the time series
    rt = np.zeros(n)
    rt[:3] = [0.032, 0.020, -0.042]  # Initial values

    # Generate the time series
    for t in range(3, n):
        rt[t] = f0 + f1 * rt[t - 1] + f2 * rt[t - 2] + f3 * rt[t - 3] + noise[t]

    # Calculate drawdown metrics
    max_so_far = rt[0]
    duration = 0
    magnitudes = []
    durations = []

    for i in range(n):
        if rt[i] >= max_so_far:
            max_so_far = rt[i]
            duration = 0  # Reset duration since no drawdown
        else:
            duration += 1

        # Compute magnitude as percentage drop from max_so_far
        magnitude = (max_so_far - rt[i]) / max_so_far if max_so_far != 0 else 0
        magnitudes.append(magnitude)
        durations.append(duration)

    # Create the DataFrame
    df = pd.DataFrame({
        'ar3': rt,
        'magnitude': magnitudes,
        'duration': durations
    })

    return df


def generate_dual_ar3_series(n=10000, var=0.000211, seed=None):
    
    if seed is not None:
        np.random.seed(seed)

    # AR(3) coefficients for the two regimes
    f0, f1, f2, f3 = 0.25, 0.2, 0.35, -0.1  # Regular regime
    f0_alt, f1_alt, f2_alt, f3_alt = 0.0001, 0.25, 0.2, 0.35  # Poor fit regime
    noise = np.random.normal(0, np.sqrt(var), n)

    # Initialize the time series
    rt = np.zeros(n)
    rt[:3] = [0.032, 0.020, -0.042]  # Initial values

    # Generate the time series with regime shifts
    for t in range(3, n):
        if (t // 30) % 2 == 0:  # Every 30 observations, decide the regime
            rt[t] = f0 + f1 * rt[t-1] + f2 * rt[t-2] + f3 * rt[t-3] + noise[t]
        else:
            rt[t] = (
                    f0_alt
                    + f1_alt * rt[t-1]
                    + f2_alt * rt[t-2]
                    + f3_alt * rt[t-3]
                    + noise[t]
            )

    # Calculate drawdown metrics
    max_so_far = rt[0]
    duration = 0
    magnitudes = []
    durations = []

    for i in range(n):
        if rt[i] >= max_so_far:
            max_so_far = rt[i]
            duration = 0  # Reset duration since no drawdown
        else:
            duration += 1

        # Compute magnitude as percentage drop from max_so_far
        magnitude = (max_so_far - rt[i]) / max_so_far if max_so_far != 0 else 0
        magnitudes.append(magnitude)
        durations.append(duration)

    # Create the DataFrame
    df = pd.DataFrame({
        'ar3': rt,
        'magnitude': magnitudes,
        'duration': durations
    })

    return df

In [22]:
import numpy as np


class PrimaryModelRuleBased:
    def __init__(self):
        self.previous_position = 0  # Start with no position

    def predict(self, returns):
        """
        Generate positions based on the rule:
        y_t = 1 if Δr_t > 0, 0 if Δr_t < 0, else y_(t-1).
        
        Args:
            returns (np.ndarray): Time series of returns (r_t).
        
        Returns:
            np.ndarray: Generated positions (y_t).
        """
        positions = np.zeros(len(returns))
        for t in range(1, len(returns)):
            delta_r = returns[t] - returns[t - 1]
            if delta_r > 0:
                positions[t] = 1  # Go long
            elif delta_r < 0:
                positions[t] = 0  # Exit
            else:
                positions[t] = positions[t - 1]  # Keep previous position
        return positions

In [23]:
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.preprocessing import MinMaxScaler


class MetaLabelingModel:
    def __init__(self, X_meta, y_meta, threshold):
        self.secondary_model = GradientBoostingClassifier(random_state=42)
        self.scaler = MinMaxScaler()
        self.threshold = threshold
        self.X_meta = X_meta
        self.y_meta = y_meta

    def fit(self):
        # Normalize the features for meta-labeling
        self.X_meta = self.scaler.fit_transform(self.X_meta)
        self.secondary_model.fit(self.X_meta, self.y_meta)

    def predict(self):
        # Predict probabilities for meta-labeling
        probabilities = self.secondary_model.predict_proba(self.X_meta)[:, 1]  # Get probability for class 1
        return probabilities

    def bet_sizing(self, probabilities):
        # Apply the threshold for final output
        return np.where(probabilities >= self.threshold, 1, 0)

In [24]:
def run_model(include_drawdowns: bool, regime: bool):
    if regime:
        data = generate_dual_ar3_series(n=10000)
    else:
        data = generate_single_ar3_series(n=10000)
    
    primary_model = PrimaryModelRuleBased()
    primary_output = primary_model.predict(data['ar3'])
    
    if include_drawdowns:
        X_meta = data.to_numpy()
    else:
        X_meta = data['ar3'].to_numpy().reshape(-1,1)
    y_meta = primary_output
    
    meta_model = MetaLabelingModel(X_meta, y_meta, threshold=0.9)
    
    meta_model.fit()
    
    meta_output = meta_model.predict()
    final_output = meta_model.bet_sizing(meta_output)
    
    # Calculate the returns from the model's decisions
    strategy_returns = final_output * np.diff(data['ar3'], prepend=0)  # Strategy returns based on positions
    
    # Calculate the Sharpe Ratio
    mean_return = np.mean(strategy_returns)
    std_return = np.std(strategy_returns)
    
    # Assume risk-free rate is 0 for simplicity
    risk_free_rate = 0
    sharpe_ratio = (mean_return - risk_free_rate) / std_return
    
    print(f"Sharpe Ratio: {sharpe_ratio:.4f}")
    
    return sharpe_ratio

In [30]:
model_runs_true = []
model_runs_false = []

for i in range(1,1001):  
    model_runs_true.append(run_model(True, True))
    model_runs_false.append(run_model(False, True))

Sharpe Ratio: 0.1454
Sharpe Ratio: 0.1323
Sharpe Ratio: 0.1348
Sharpe Ratio: 0.1384
Sharpe Ratio: 0.1389
Sharpe Ratio: 0.1325
Sharpe Ratio: 0.1197
Sharpe Ratio: 0.0962
Sharpe Ratio: 0.1240
Sharpe Ratio: 0.1349
Sharpe Ratio: 0.1364
Sharpe Ratio: 0.1538
Sharpe Ratio: 0.1255
Sharpe Ratio: 0.1220
Sharpe Ratio: 0.1390
Sharpe Ratio: 0.1408
Sharpe Ratio: 0.1423
Sharpe Ratio: 0.1447
Sharpe Ratio: 0.1470
Sharpe Ratio: 0.1618
Sharpe Ratio: 0.1525
Sharpe Ratio: 0.1385
Sharpe Ratio: 0.1469
Sharpe Ratio: 0.1060
Sharpe Ratio: 0.1473
Sharpe Ratio: 0.1314
Sharpe Ratio: 0.1052
Sharpe Ratio: 0.1591
Sharpe Ratio: 0.1487
Sharpe Ratio: 0.1293
Sharpe Ratio: 0.1698
Sharpe Ratio: 0.1494
Sharpe Ratio: 0.1297
Sharpe Ratio: 0.1294
Sharpe Ratio: 0.1287
Sharpe Ratio: 0.1025
Sharpe Ratio: 0.1421
Sharpe Ratio: 0.1234
Sharpe Ratio: 0.1276
Sharpe Ratio: 0.1413
Sharpe Ratio: 0.1221
Sharpe Ratio: 0.1268
Sharpe Ratio: 0.0998
Sharpe Ratio: 0.1468
Sharpe Ratio: 0.1424
Sharpe Ratio: 0.1142
Sharpe Ratio: 0.1581
Sharpe Ratio:

In [31]:
np.mean(model_runs_true)

np.float64(0.13048572245362988)

In [32]:
np.mean(model_runs_false)

np.float64(0.1311486315448538)