In [1]:
import pandas as pd
import numpy as np
import statistics

In [2]:
def calculate_volatility(prices, delta, timespan=30):
    """
    This funtion calculates price volatility....
    
    Parameters:
    Prices (series): Series of prices where the index should be a pandas timestamp
    Timespan (integer): Number of periods to be considered, default is 30
    Delta (pandas timestamp): Time gap
    
    Returns:
    Volatilities (series): Calculated volatility for each timestamp
    """  
    
    vol = prices.index.searchsorted(prices.index - delta)
    vol = df0[df0 > 0]
    vol = pd.Series(prices.index[df0-1], index=prices.index[prices.shape[0]-df0.shape[0] : ])
    vol = prices.loc[vol.index] / prices.loc[vol.values].values - 1
    vol = vol.ewm(span=timespan).std()
    
    return vol

In [3]:
def get_vol(prices, delta, span=100):
    """
    This funtion calculates volatilitybased on a given timedelta
    
    Parameters:
    Prices (series): Series of prices where the index should be a pandas timestamp
    delta (pd.Timedelta): Number of periods to be considered
    span (integer): default value is set to 100
    
    Returns:
    df0 (series): series of volatility to be added to the target dataframe
    """  
    
    # find the timestamps of p[t-1] values
    df0 = prices.index.searchsorted(prices.index - delta)
    df0 = df0[df0 > 0]
    
    # align timestamps of p[t-1] to timestamps of p[t]
    df0 = pd.Series(prices.index[df0-1],    
           index=prices.index[prices.shape[0]-df0.shape[0] : ])
    
    # get values by timestamps, then compute returns
    df0 = prices.loc[df0.index] / prices.loc[df0.values].values - 1
    
    # estimate rolling standard deviation
    df0 = df0.ewm(span=span).std()
    
    return df0

In [4]:
def fixed_time_horizon_not_dynamic(data, threshold):
    
    """
    This function assigns labels to the data points according to the fixed time horizon method
    
    Parameters:
    data (dataframe): dataframe containing the prices and horizons of the asset
    threshold (float): threshold for labeling a datapoint 
    
    Returns:
    data (dataframe): dataframe with an extra column containing the label for each datapoint
    """
    
    #create new column for the label
    data["fth_label"] = ""
    
    #iterate over the dataframe to access each row
    for index, row in data.iterrows():
        
        #get price at the beginning and end of the horizon
        row_t1 = data.loc[[(row["horizon"])]]
        price_t1 = row_t1.Price.item()
        price_t0 = row["Price"]
        
        #calculate relative return within the fixed horizon
        relative_return = price_t1 / price_t0 -1
        
        #assign label to the datapoints according to relative return compared to threshold
        if (relative_return > threshold):
            data.loc[index, "fth_label"] = 1
        elif (relative_return < (-threshold)):
            data.loc[index, "fth_label"] = -1
        elif ((-threshold) <= relative_return <= threshold):
            data.loc[index, "fth_label"] = 0
    
    return data

In [5]:
def apply_fixed_time_horizon(dictionary, threshold):
    """
    Function to loop over the currencies and assign fixed time horizon label to all the currencies
    
    Parameters:
    dictionary (dictionary): all crypto currencies in a given directory
    threshold (float): threshold for classifying 
    
    Returns:
    dictionary (dict): dictionary with dataframes containing labels
    """
    
    #loop over currencies
    for currency in dictionary:
        dictionary[currency] = fixed_time_horizon_not_dynamic(dictionary[currency], threshold)
    
    return dictionary

In [6]:
def get_touches(prices, events, factors=[1, 1]):
    '''
      events: pd dataframe with columns
        horizon: timestamp of the next horizon
        threshold: unit height of top and bottom barriers
        side: the side of each bet
      factors: multipliers of the threshold to set the height of 
               top/bottom barriers
    '''
    
    out = events[['horizon']].copy(deep=True)
    
    if factors[0] > 0: 
        thresh_uppr = factors[0] * events['threshold']
    else: 
        thresh_uppr = pd.Series(index=events.index) # no uppr thresh
    if factors[1] > 0: 
        thresh_lwr = -factors[1] * events['threshold']
    else: 
        thresh_lwr = pd.Series(index=events.index)  # no lwr thresh
        
    for loc, horizon in events['horizon'].items():
        df0=prices[loc:horizon]                              # path prices
        df0=(df0 / prices[loc] - 1) * events.side[loc]  # path returns
        out.loc[loc, 'stop_loss'] = \
        df0[df0 < thresh_lwr[loc]].index.min()  # earliest stop loss
        out.loc[loc, 'take_profit'] = \
        df0[df0 > thresh_uppr[loc]].index.min() # earliest take profit
        
    return out

In [7]:
def get_labels(touches):
    out = touches.copy(deep=True)
  # pandas df.min() ignores NaN values
    first_touch = touches[['stop_loss', 'take_profit']].min(axis=1)
    for loc, t in first_touch.items():
        if pd.isnull(t):
            out.loc[loc, 'label'] = 0
        elif t == touches.loc[loc, 'stop_loss']: 
            out.loc[loc, 'label'] = -1
        else:
            out.loc[loc, 'label'] = 1
    return out

In [8]:
def apply_tbm_to_currency_old(data, delta):
    data = data.assign(threshold = get_vol(data.Price, delta)).dropna()
    #data = data.assign(horizon = get_horizons(data)).dropna()
    events = data[['horizon', 'threshold']] 
    events = events.assign(side = pd.Series(1., events.index)) # long only
    touches = get_touches(data.Price, events, [1,1])
    touches = get_labels(touches)
    data = data.assign(tbm_label = touches.label)
    
    return data

In [9]:
def apply_tbm_to_currency(data, delta):
    data = data.assign(threshold = get_vol(data.Price, delta))
    #data = data.assign(horizon = get_horizons(data)).dropna()
    events = data[['horizon', 'threshold']] 
    events = events.assign(side = pd.Series(1., events.index)) # long only
    touches = get_touches(data.Price, events, [1,1])
    touches = get_labels(touches)
    data = data.assign(tbm_label = touches.label)
    
    return data

In [10]:
def apply_tbm(dictionary, volatility_delta):
    """
    Function to loop over the currencies and assign tbm label to all the currencies
    
    Parameters:
    dictionary (dictionary): all crypto currencies in a given directory
    
    Returns:
    dictionary (dict): dictionary with dataframes containing labels
    """
    
    #loop over currencies
    for currency in dictionary:
        dictionary[currency] = apply_tbm_to_currency(dictionary[currency], volatility_delta)
    
    return dictionary

In [11]:
def calculate_relative_return(df):
    """
    This function calculates the relative return within the horizon
    
    Parameters:
    df (dataframe): dataframe with prices for the cryptocurrency where index is timestamp
    
    Returns:
    df (dataframe): dataframe containing extra column for the relative return
    """
    
    #create new column for the relative return
    df["relative_return"] = ""
    
    #iterate over the dataframe to access each row
    for index, row in df.iterrows():
        
        #get price at the beginning and end of the horizon
        row_t1 = df.loc[[(row["horizon"])]]
        price_t1 = row_t1.Price.item()
        price_t0 = row["Price"]
    
        #calculate relative return within the fixed horizon
        relative_return = price_t1 / price_t0 -1
        
        df.loc[index, "relative_return"] = relative_return
        
    return df

In [12]:
def assign_relative_returns(dictionary):
    """
    This function assigns the relative returns to the currency dataframes
    
    Parameters:
    dictionary (dictionary): all crypto currencies in a given directory
    
    Returns:
    dictionary (dict): dictionary containing dataframes with extra column for relative return within the given horizon
    """
    
    #loop over currencies
    for currency in dictionary:
        #dictionary[currency] = allign_data_tail(dictionary[currency], delta)
        dictionary[currency] = calculate_relative_return(dictionary[currency])
    
    return dictionary

In [13]:
def get_market_return(dictionary, index):
    """
    Function to get all the market returns out of the dataframes of the currencies for a given timing
    
    Parameters:
    dictionary (dictionary): crypto currency data 
    index (int): timing of the returns 
    
    Returns:
    mean_returns (list): list with mean returns of the cryptocurrencies for every data point
    median_returns (list): list with median returns of the cryptocurrencies for every data point
    """
    
    market_returns = []
    
    for currency in dictionary:
        # Only apply if there is actually data for the currency at given timing
        if len(dictionary[currency]) > index:
            market_returns.append(dictionary[currency].iloc[index].relative_return) 
    return market_returns

In [14]:
def calculate_mean_median_market_return(dictionary):
    """
    This function calculate market mean and median for each data point at given horizon
    
    Parameters:
    dictionary (dictionary): crypto currency data 
    
    Returns:
    mean_returns (list): list with mean returns of the cryptocurrencies for every data point
    median_returns (list): list with median returns of the cryptocurrencies for every data point
    """
    
    mean_returns = []
    median_returns = []
    
    #Bitcoin as standard because it is one of the oldest
    max_counter = len(dictionary["Bitcoin"])
    
    counter = 0
    
    while counter < max_counter:
        # Retrieve a list of returns for every crypto currency for given data point (counter iterates over rows)
        market_returns = get_market_return(dictionary, counter)
        # Calculate mean and median of that list which contains data from all currencies 
        mean_returns.append(statistics.mean(market_returns))
        median_returns.append(statistics.median(market_returns)) 
        counter += 1
 
    return mean_returns, median_returns

In [15]:
def label_excess_over_mean_median(df, mean_returns, median_returns):
    """
    This function assigns the labels to the datapoints by comparing between market mean / median and individual mean / median at given time
    
    Parameters:
    df (dataframe): crypto currency data (either daily or weekly)
    
    Returns:
    df (dataframe): same input df with 2 extra columns with the mean and median label compared to market performance
    """
    
    # Match entries for crypto currencies younger than Bitcoin
    mean_returns = mean_returns[-len(df):]
    median_returns = median_returns[-len(df):]
    
    # Assign label according to a comparison between market mean and individual mean at given time
    df.loc[df.relative_return > mean_returns, "excess_over_mean"] = 1
    df.loc[df.relative_return < mean_returns, "excess_over_mean"] = -1
    df.loc[df.relative_return == mean_returns, "excess_over_mean"] = 0
    
    # Assign label according to a comparison between market median and individual median at given time
    df.loc[df.relative_return > median_returns, "excess_over_median"] = 1
    df.loc[df.relative_return < median_returns, "excess_over_median"] = -1
    df.loc[df.relative_return == median_returns, "excess_over_median"] = 0
    
    return df

In [16]:
def assign_excess_over_mean_median_label(dictionary, mean_returns, median_returns):
    for currency in dictionary:
        dictionary[currency] = label_excess_over_mean_median(dictionary[currency], mean_returns, median_returns)
    return dictionary

In [17]:
def label_tail_sets(df):
    
    # Hier muss eigentlich mit dem Volatilitätsbereinigtem relativem Return gerechnet werden (siehe Theorieteil)
    
    upper_threshold = df.relative_return.quantile(0.75)
    lower_threshold = df.relative_return.quantile(0.25)
    
    def label_tail(row, upper, lower):
        if row['relative_return'] > upper:
            return 1
        elif row['relative_return'] < lower:
            return -1
        else:
            return 0
        
    df['tail_sets'] = df.apply(label_tail, axis=1, upper = upper_threshold, lower = lower_threshold)
    return df

In [18]:
def assign_tail_sets(dictionary):
    for currency in dictionary:
        dictionary[currency] = label_tail_sets(dictionary[currency])
    return dictionary

In [19]:
# Funktion zur Berechnung des Regressionskoeffizienten und des t-Werts
def calculate_t_value_alt(y):
    X = np.arange(len(y))
    X = np.vstack([X, np.ones(len(X))]).T  # Konstante hinzufügen
    beta, _, _, _ = np.linalg.lstsq(X, y, rcond=None)
    
    y_pred = X @ beta
    residuals = y - y_pred
    sse = np.sum(residuals**2)
    sst = np.sum((y - np.mean(y))**2)
    
    if sst == 0 or sse == 0 or len(y) <= 2:
        return np.nan
    
    se = np.sqrt(sse / (len(y) - 2)) / np.sqrt(np.sum((X[:, 0] - np.mean(X[:, 0])) ** 2))
    
    if se == 0:
        return np.nan
    
    t_value = beta[0] / se
    return t_value

# Trend Scanning Funktion
def trend_scanning_alt(data, max_l):
    labels = []
    for t in range(len(data) - max_l):
        max_t_value = -np.inf
        for l in range(1, max_l + 1):
            y = data['Price'][t:t+l+1].values
            if len(y) <= 2 or np.any(np.isnan(y)) or np.any(np.isinf(y)):
                continue
            
            t_value = calculate_t_value(y)
            if np.isnan(t_value):
                continue
            if abs(t_value) > abs(max_t_value):
                max_t_value = t_value
        
        print(max_t_value)
        
        if max_t_value == -np.inf:
            labels.append(0)
        elif max_t_value > 2:
            labels.append(1)
        elif max_t_value < -2:
            labels.append(-1)
        else:
            labels.append(0)
    
    labels.extend([0] * max_l)  # Füllen der restlichen Labels mit 0 (no trend) für die letzten max_l Zeitpunkte
    return labels


In [20]:
def trend_scanning(df, max_L=10, threshold_up=1.96, threshold_down=-1.96):
    # Extract the price series
    x = df['Price'].values
    T = len(x)

    # Initialize labels
    y = np.zeros(T)

    # Function to perform OLS regression using pseudo-inverse
    def ols_regression(x, y):
        X = np.vstack([np.ones(len(x)), x]).T
        beta = np.linalg.pinv(X.T @ X) @ X.T @ y
        residuals = y - X @ beta
        residual_sum_squares = np.sum(residuals**2)
        total_variance = np.sum((x - np.mean(x))**2)
        
        # Check for zero division and handle it
        if len(x) <= 2 or total_variance == 0:
            sigma_beta = np.inf  # To avoid division by zero in t-value computation
        else:
            sigma_beta = np.sqrt(residual_sum_squares / (len(x) - 2) / total_variance)
        
        return beta, sigma_beta

    # Loop over each time point
    for t in range(max_L, T):
        best_t_value = -np.inf
        best_L = 0

        # Try different look-forward periods
        for L in range(1, max_L + 1):
            if t + L > T:
                continue
            y_forward = x[t:t+L]
            x_forward = np.arange(L)

            # Fit OLS regression
            beta, sigma_beta = ols_regression(x_forward, y_forward)
            if sigma_beta == 0 or sigma_beta == np.inf:
                t_value = -np.inf  # Handle division by zero or infinite standard deviation
            else:
                t_value = beta[1] / sigma_beta  # t-value for the slope

            # Select the best t-value
            if t_value > best_t_value:
                best_t_value = t_value
                best_L = L

        # Assign label based on the best t-value
        if best_t_value > threshold_up:
            y[t] = 1
        elif best_t_value < threshold_down:
            y[t] = -1
        else:
            y[t] = 0

    # Add the trend labels to the DataFrame
    df['trend'] = y
    return df


In [21]:
def assign_trend_scanning(dictionary):
    
    # Parameter definieren
    #max_l = 10  # Maximale Look-Forward Periode
    
    for currency in dictionary:
        dictionary[currency] = trend_scanning(dictionary[currency])
    return dictionary

In [22]:
def calculate_matrix_flags(df, window_size=20, threshold=0.05):
    """
    Calculate matrix flags for a given DataFrame with price data.
    
    Parameters:
        df (pd.DataFrame): DataFrame with a 'Price' column.
        window_size (int): Size of the window to calculate the matrix and flags.
        threshold (float): Percentage change threshold to detect significant price movements.
    
    Returns:
        pd.DataFrame: DataFrame with an additional 'matrix_flag' column.
    """
    
    # Create a copy of the DataFrame to avoid modifying the original one
    df = df.copy()
    
    # Initialize the 'matrix_flag' column
    df['matrix_flag'] = 0
    
    # Calculate rolling returns
    df['return'] = df['Price'].pct_change(window_size)
    
    for i in range(window_size, len(df)):
        # Get the window of prices
        window_prices = df['Price'].iloc[i-window_size:i]
        
        # Check for bull flag (significant upward movement followed by a consolidation period)
        if (window_prices.iloc[-1] > window_prices.iloc[0] * (1 + threshold)) and \
           (df['Price'].iloc[i] > window_prices.iloc[-1] * (1 + threshold)):
            df.loc[df.index[i], 'matrix_flag'] = 1  # Long
        
        # Check for bear flag (significant downward movement followed by a consolidation period)
        elif (window_prices.iloc[-1] < window_prices.iloc[0] * (1 - threshold)) and \
             (df['Price'].iloc[i] < window_prices.iloc[-1] * (1 - threshold)):
            df.loc[df.index[i], 'matrix_flag'] = -1  # Short

    # Drop the return column as it's no longer needed
    df.drop(columns=['return'], inplace=True)
    
    return df


In [23]:
def assign_matrix_flags(dictionary):
    
    # Parameter definieren
    #max_l = 10  # Maximale Look-Forward Periode
    
    for currency in dictionary:
        dictionary[currency] = calculate_matrix_flags(dictionary[currency])
    return dictionary