## Support Functions

Auxiliary file

In [9]:
import pandas as pd
import numpy as np

In [10]:
def Calc_intervals(series, U):
    mu = np.mean(series)
    
    # Initialize variables
    above_U_intervals = []
    below_minus_U_intervals = []
    revert_to_mean_intervals = []
    cross_mean_intervals = []
    
    # Track the state
    above_U = False
    below_minus_U = False
    revert_start = None
    cross_start = None
    
    for t in range(len(series)):
        a = series[t]
        
        if above_U:
            if a <= mu:
                # If we're above U and now we've reverted to the mean or below
                above_U = False
                revert_to_mean_intervals.append(t - revert_start)
                cross_start = t
        elif below_minus_U:
            if a >= mu:
                # If we're below -U and now we've reverted to the mean or above
                below_minus_U = False
                revert_to_mean_intervals.append(t - revert_start)
                cross_start = t
        else:
            if a > U:
                # If we cross above U
                above_U = True
                if cross_start is not None:
                    cross_mean_intervals.append(t - cross_start)
                revert_start = t
            elif a < -U:
                # If we cross below -U
                below_minus_U = True
                if cross_start is not None:
                    cross_mean_intervals.append(t - cross_start)
                revert_start = t
    
    # Calculate the average times
    avg_revert_to_mean_time = np.mean(revert_to_mean_intervals) if revert_to_mean_intervals else float('nan')
    avg_cross_mean_to_U_time = np.mean(cross_mean_intervals) if cross_mean_intervals else float('nan')
    
    return avg_revert_to_mean_time, avg_cross_mean_to_U_time
    

In [1]:
def Minimum_Optimized_Profit(series):
    mu = np.mean(series)
    sigma = np.std(series)
    T = len(series)
    max_value = 0
    best_U = 0

    for U in range(1, int(sigma * 100) + 1):
        U = U / 100
        intervals = Calc_intervals(series, U)
        
        if np.isnan(intervals[0]) or np.isnan(intervals[1]):
            continue
        
        minimum_profit = U * ((T / (intervals[0] + intervals[1])) - 1)

        if minimum_profit > max_value:
            max_value = minimum_profit
            best_U = U

    return best_U


In [4]:
def Backtest(data, Z, U, b, Initial_Cash, Max_Leverage):
    OpenPosition = None
    df = pd.DataFrame(columns=['Investment_A', 'Investment_B', 'Reserve', 'Profit', 'Portfolio_Val', 'Position'])
    records = []

    for i in range(1, len(Z)):  # Start from 1 to ensure Z[i-1] is valid

        Price_A = data['Stock_A'].iloc[i]
        Price_B = data['Stock_B'].iloc[i]

        if OpenPosition is None:
            if Z.iloc[i] > U and Z.iloc[i - 1] <= U:
                OpenPosition = "Sell B, Buy A"
                Selling_price_B = Price_B
                Buying_price_A = Price_A
                Initial_Value_B = Initial_Cash*(1/Max_Leverage)
                Initial_Value_A = Initial_Value_B * b
                Reserve = Initial_Value_B - Initial_Value_A
                Margin = Initial_Value_B * Max_Leverage
                Portfolio_Value = Reserve - Initial_Value_B + Initial_Value_A + Margin
                Profit = 0
                position = {'Investment_A': Initial_Value_A,
                            'Investment_B': Initial_Value_B,
                            'Reserve': Reserve,
                            'Profit': Profit,
                            'Portfolio_Val': Portfolio_Value,
                            'Position': OpenPosition}

            elif Z.iloc[i] < -U and Z.iloc[i - 1] >= -U:
                OpenPosition = "Sell A, Buy B"
                Buying_price_B = Price_B
                Selling_price_A = Price_A
                Initial_Value_B = Initial_Cash*(1/Max_Leverage)
                Initial_Value_A = Initial_Value_B * b
                Reserve = 0
                Margin = Initial_Value_A * Max_Leverage
                Portfolio_Value = Reserve + Initial_Value_B - Initial_Value_A + Margin
                Profit = 0
                position = {'Investment_A': Initial_Value_A,
                            'Investment_B': Initial_Value_B,
                            'Reserve': Reserve,
                            'Profit': Profit,
                            'Portfolio_Val': Portfolio_Value,
                            'Position': OpenPosition}
            else:
                OpenPosition = None
                Initial_Value_B = 0
                Initial_Value_A = 0
                Reserve = Initial_Cash
                Margin = 0
                Portfolio_Value = Initial_Cash
                Profit = 0
                position = {'Investment_A': Initial_Value_A,
                            'Investment_B': Initial_Value_B,
                            'Reserve': Reserve,
                            'Profit': Profit,
                            'Portfolio_Val': Portfolio_Value,
                            'Position': 'None'}

        elif OpenPosition == "Sell B, Buy A":
            Investment_in_B = (Initial_Value_B / Selling_price_B) * Price_B
            Investment_in_A = ((Initial_Value_B * b) / Buying_price_A) * Price_A
            Profit = (Investment_in_A - Initial_Value_A) + (Initial_Value_B - Investment_in_B)
            Portfolio_Value = Reserve + Investment_in_A - Investment_in_B + Margin

            if Z.iloc[i] <= 0 and Z.iloc[i - 1] > 0:
                OpenPosition = "Close"
                Investment_in_B = 0
                Investment_in_A = 0
                Reserve = Portfolio_Value
                Initial_Value_B = 0
                Initial_Value_A = 0
                position = {'Investment_A': Investment_in_A,
                            'Investment_B': Investment_in_B,
                            'Reserve': Reserve,
                            'Profit': Profit,
                            'Portfolio_Val': Portfolio_Value,
                            'Position': OpenPosition}
            else:
                position = {'Investment_A': Investment_in_A,
                            'Investment_B': Investment_in_B,
                            'Reserve': Reserve,
                            'Profit': Profit,
                            'Portfolio_Val': Portfolio_Value,
                            'Position': 'Hold'}

        elif OpenPosition == "Sell A, Buy B":
            Investment_in_B = (Initial_Value_B / Buying_price_B) * Price_B
            Investment_in_A = ((Initial_Value_B * b) / Selling_price_A) * Price_A
            Profit = (Investment_in_B - Initial_Value_B) + (Initial_Value_A - Investment_in_A)
            Portfolio_Value = Reserve + Investment_in_B - Investment_in_A + Margin

            if Z.iloc[i] >= 0 and Z.iloc[i - 1] < 0:
                OpenPosition = "Close"
                Investment_in_B = 0
                Investment_in_A = 0
                Reserve = Portfolio_Value
                Initial_Value_B = 0
                Initial_Value_A = 0
                position = {'Investment_A': Investment_in_A,
                            'Investment_B': Investment_in_B,
                            'Reserve': Reserve,
                            'Profit': Profit,
                            'Portfolio_Val': Portfolio_Value,
                            'Position': OpenPosition}
            else:
                position = {'Investment_A': Investment_in_A,
                            'Investment_B': Investment_in_B,
                            'Reserve': Reserve,
                            'Profit': Profit,
                            'Portfolio_Val': Portfolio_Value,
                            'Position': 'None'}

        elif OpenPosition == "Close":
            OpenPosition = None
            Initial_Cash = Portfolio_Value
            Investment_in_A = 0
            Investment_in_B = 0
            Reserve = Initial_Cash
            Profit = 0
            position = {'Investment_A': Investment_in_A,
                        'Investment_B': Investment_in_B,
                        'Reserve': Reserve,
                        'Profit': Profit,
                        'Portfolio_Val': Portfolio_Value,
                        'Position': 'Close'}

        records.append(position)  # Append to the list

    df = pd.DataFrame(records)  # Convert the list to a DataFrame

    return df

In [1]:
def portfolio_statistics(portfolio_value, risk_free_rate):
    count = len(portfolio_value)
    
    # Calculate total return
    ret = portfolio_value.iloc[-1] / portfolio_value.iloc[0] - 1
    
    # Calculate returns
    returns = portfolio_value.pct_change().dropna()
    returns = returns.apply(lambda x: x[0] if isinstance(x, np.ndarray) else x)
    
    # Calculate annualized volatility
    vol = np.std(returns)
    vol_ann = vol * np.sqrt(252 * 6)
    
    # Calculate annualized return
    ret_ann = (1 + ret) ** (365 / (count / 6)) - 1
    
    # Calculate Sharpe ratio
    sharpe = (ret_ann - risk_free_rate) / vol_ann
    
    return float(ret_ann), float(vol_ann), float(sharpe)
