In [1]:
import pandas as pd
import numpy as np
import statistics

In [2]:
def calculate_volatility(prices, delta, timespan=30):
    """
    This funtion calculates price volatility....
    
    Parameters:
    Prices (series): Series of prices where the index should be a pandas timestamp
    Timespan (integer): Number of periods to be considered, default is 30
    Delta (pandas timestamp): Time gap
    
    Returns:
    Volatilities (series): Calculated volatility for each timestamp
    """  
    
    vol = prices.index.searchsorted(prices.index - delta)
    vol = df0[df0 > 0]
    vol = pd.Series(prices.index[df0-1], index=prices.index[prices.shape[0]-df0.shape[0] : ])
    vol = prices.loc[vol.index] / prices.loc[vol.values].values - 1
    vol = vol.ewm(span=timespan).std()
    
    return vol

In [3]:
def get_vol(prices, delta, span=30):
    """
    This funtion calculates volatilitybased on a given timedelta
    
    Parameters:
    Prices (series): Series of prices where the index should be a pandas timestamp
    delta (pd.Timedelta): Number of periods to be considered
    span (integer): default value is set to 100
    
    Returns:
    df0 (series): series of volatility to be added to the target dataframe
    """  
    
    # find the timestamps of p[t-1] values
    df0 = prices.index.searchsorted(prices.index - delta)
    df0 = df0[df0 > 0]
    
    # align timestamps of p[t-1] to timestamps of p[t]
    df0 = pd.Series(prices.index[df0-1],    
           index=prices.index[prices.shape[0]-df0.shape[0] : ])
    
    # get values by timestamps, then compute returns
    df0 = prices.loc[df0.index] / prices.loc[df0.values].values - 1
    
    # estimate rolling standard deviation
    df0 = df0.ewm(span=span).std()
    
    return df0

In [4]:
def fixed_time_horizon_not_dynamic(data, threshold):
    
    """
    This function assigns labels to the data points according to the fixed time horizon method
    
    Parameters:
    data (dataframe): dataframe containing the prices and horizons of the asset
    threshold (float): threshold for labeling a datapoint 
    
    Returns:
    data (dataframe): dataframe with an extra column containing the label for each datapoint
    """
    
    #create new column for the label
    data["fth_label"] = ""
    
    #iterate over the dataframe to access each row
    for index, row in data.iterrows():
        
        #get price at the beginning and end of the horizon
        row_t1 = data.loc[[(row["horizon"])]]
        price_t1 = row_t1.Price.item()
        price_t0 = row["Price"]
        
        #calculate relative return within the fixed horizon
        relative_return = price_t1 / price_t0 -1
        
        #assign label to the datapoints according to relative return compared to threshold
        if (relative_return > threshold):
            data.loc[index, "fth_label"] = 1
        elif (relative_return < (-threshold)):
            data.loc[index, "fth_label"] = -1
        elif ((-threshold) <= relative_return <= threshold):
            data.loc[index, "fth_label"] = 0
    
    return data

In [5]:
def apply_fixed_time_horizon(dictionary, threshold):
    """
    Function to loop over the currencies and assign fixed time horizon label to all the currencies
    
    Parameters:
    dictionary (dictionary): all crypto currencies in a given directory
    threshold (float): threshold for classifying 
    
    Returns:
    dictionary (dict): dictionary with dataframes containing labels
    """
    
    #loop over currencies
    for currency in dictionary:
        dictionary[currency] = fixed_time_horizon_not_dynamic(dictionary[currency], threshold)
    
    return dictionary

In [6]:
def get_touches(prices, events, factors=[1, 1]):
    """
    This function determines the earliest times when the price touches
    the stop loss or take profit thresholds.
    
    Parameters:
    prices (Series): The time series of prices.
    events (DataFrame): DataFrame with the following columns:
        horizon: Timestamp of the next horizon.
        threshold: Unit height of top and bottom barriers.
        side: The side of each bet.
        factors (list): Multipliers of the threshold to set the height of top/bottom barriers.
    Returns:
    DataFrame: DataFrame with the earliest stop loss and take profit times.
    """
    
    # Create a copy of the horizon column from the events dataframe
    out = events[['horizon']].copy(deep=True)
    
    # Calculate the upper threshold if the first factor is greater than 0
    if factors[0] > 0: 
        thresh_uppr = factors[0] * events['threshold']
    else: 
        thresh_uppr = pd.Series(index=events.index)  # No upper threshold
    
    # Calculate the lower threshold if the second factor is greater than 0
    if factors[1] > 0: 
        thresh_lwr = -factors[1] * events['threshold']
    else: 
        thresh_lwr = pd.Series(index=events.index)  # No lower threshold
        
    # Iterate over each event to find the earliest stop loss and take profit times
    for loc, horizon in events['horizon'].items():
        df0 = prices[loc:horizon]                            # Path prices
        df0 = (df0 / prices[loc] - 1) * events.side[loc]     # Path returns
        
        # Find the earliest stop loss time
        out.loc[loc, 'stop_loss'] = df0[df0 < thresh_lwr[loc]].index.min()
        
        # Find the earliest take profit time
        out.loc[loc, 'take_profit'] = df0[df0 > thresh_uppr[loc]].index.min()
        
    return out

In [7]:
def get_labels(touches):
    """
    This function assigns labels based on whether the price first hits the stop loss or take profit thresholds.
    
    Parameters:
    touches (DataFrame): DataFrame with columns 'stop_loss' and 'take_profit' indicating the times these events occur.
    
    Returns:
    DataFrame: DataFrame with an additional 'label' column where:
        -1 indicates the price hit the stop loss first,
        1 indicates the price hit the take profit first,
        0 indicates neither threshold was hit.
    """
    
    # Create a copy of the touches dataframe
    out = touches.copy(deep=True)
    
    # Find the earliest touch event, ignoring NaN values
    first_touch = touches[['stop_loss', 'take_profit']].min(axis=1)
    
    # Assign labels based on the earliest touch event
    for loc, t in first_touch.items():
        if pd.isnull(t):
            out.loc[loc, 'label'] = 0   # Neither threshold was hit
        elif t == touches.loc[loc, 'stop_loss']:
            out.loc[loc, 'label'] = -1  # Stop loss was hit first
        else:
            out.loc[loc, 'label'] = 1   # Take profit was hit first
    
    return out


In [8]:
def apply_tbm_to_currency(data, delta):
    """
    This function applies a time-based model (TBM) to currency data to generate labels indicating market movements.
    
    Parameters:
    data (DataFrame): The dataframe containing the currency price data with a 'Price' column.
    delta (float): The parameter used to calculate the volatility threshold.
    
    Returns:
    DataFrame: The original dataframe with an additional 'tbm_label' column indicating the labels generated by the TBM.
    """
    
    # Calculate the volatility threshold and add it to the dataframe
    data = data.assign(threshold=get_vol(data.Price, delta))
    
    # Prepare the events dataframe with horizon and threshold columns
    events = data[['horizon', 'threshold']]
    
    # Assign a constant side value of 1.0 for long-only positions
    events = events.assign(side=pd.Series(1., events.index))
    
    # Get the earliest stop loss and take profit times
    touches = get_touches(data.Price, events, [1, 1])
    
    # Generate labels based on the earliest stop loss and take profit times
    touches = get_labels(touches)
    
    # Add the TBM labels to the original dataframe
    data = data.assign(tbm_label=touches.label)
    
    return data

In [9]:
def apply_tbm(dictionary, volatility_delta):
    """
    Function to loop over the currencies and assign tbm label to all the currencies
    
    Parameters:
    dictionary (dictionary): all crypto currencies in a given directory
    
    Returns:
    dictionary (dict): dictionary with dataframes containing labels
    """
    
    #loop over currencies
    for currency in dictionary:
        dictionary[currency] = apply_tbm_to_currency(dictionary[currency], volatility_delta)
    
    return dictionary

In [10]:
def calculate_relative_return(df):
    """
    This function calculates the relative return within the horizon
    
    Parameters:
    df (dataframe): dataframe with prices for the cryptocurrency where index is timestamp
    
    Returns:
    df (dataframe): dataframe containing extra column for the relative return
    """
    
    #create new column for the relative return
    df["relative_return"] = ""
    
    #iterate over the dataframe to access each row
    for index, row in df.iterrows():
        
        #get price at the beginning and end of the horizon
        row_t1 = df.loc[[(row["horizon"])]]
        price_t1 = row_t1.Price.item()
        price_t0 = row["Price"]
    
        #calculate relative return within the fixed horizon
        relative_return = price_t1 / price_t0 -1
        
        df.loc[index, "relative_return"] = relative_return
        
    return df

In [11]:
def assign_relative_returns(dictionary):
    """
    This function assigns the relative returns to the currency dataframes
    
    Parameters:
    dictionary (dictionary): all crypto currencies in a given directory
    
    Returns:
    dictionary (dict): dictionary containing dataframes with extra column for relative return within the given horizon
    """
    
    #loop over currencies
    for currency in dictionary:
        #dictionary[currency] = allign_data_tail(dictionary[currency], delta)
        dictionary[currency] = calculate_relative_return(dictionary[currency])
    
    return dictionary

In [12]:
def get_market_return(dictionary, index):
    """
    Function to get all the market returns out of the dataframes of the currencies for a given timing
    
    Parameters:
    dictionary (dictionary): crypto currency data 
    index (int): timing of the returns 
    
    Returns:
    mean_returns (list): list with mean returns of the cryptocurrencies for every data point
    median_returns (list): list with median returns of the cryptocurrencies for every data point
    """
    
    market_returns = []
    
    for currency in dictionary:
        # Only apply if there is actually data for the currency at given timing
        if len(dictionary[currency]) > index:
            market_returns.append(dictionary[currency].iloc[index].relative_return) 
    return market_returns

In [13]:
def calculate_mean_median_market_return(dictionary):
    """
    This function calculate market mean and median for each data point at given horizon
    
    Parameters:
    dictionary (dictionary): crypto currency data 
    
    Returns:
    mean_returns (list): list with mean returns of the cryptocurrencies for every data point
    median_returns (list): list with median returns of the cryptocurrencies for every data point
    """
    
    mean_returns = []
    median_returns = []
    
    #Bitcoin as standard because it is one of the oldest
    max_counter = len(dictionary["Bitcoin"])
    
    counter = 0
    
    while counter < max_counter:
        # Retrieve a list of returns for every crypto currency for given data point (counter iterates over rows)
        market_returns = get_market_return(dictionary, counter)
        # Calculate mean and median of that list which contains data from all currencies 
        mean_returns.append(statistics.mean(market_returns))
        median_returns.append(statistics.median(market_returns)) 
        counter += 1
 
    return mean_returns, median_returns

In [14]:
def label_excess_over_mean_median(df, mean_returns, median_returns):
    """
    This function assigns the labels to the datapoints by comparing between market mean / median and individual mean / median at given time
    
    Parameters:
    df (dataframe): crypto currency data (either daily or weekly)
    
    Returns:
    df (dataframe): same input df with 2 extra columns with the mean and median label compared to market performance
    """
    
    # Match entries for crypto currencies younger than Bitcoin
    mean_returns = mean_returns[-len(df):]
    median_returns = median_returns[-len(df):]
    
    # Assign label according to a comparison between market mean and individual mean at given time
    df.loc[df.relative_return > mean_returns, "excess_over_mean"] = 1
    df.loc[df.relative_return < mean_returns, "excess_over_mean"] = -1
    df.loc[df.relative_return == mean_returns, "excess_over_mean"] = 0
    
    # Assign label according to a comparison between market median and individual median at given time
    df.loc[df.relative_return > median_returns, "excess_over_median"] = 1
    df.loc[df.relative_return < median_returns, "excess_over_median"] = -1
    df.loc[df.relative_return == median_returns, "excess_over_median"] = 0
    
    return df

In [15]:
def assign_excess_over_mean_median_label(dictionary, mean_returns, median_returns):
    """
    This function assigns labels to each currency in the dictionary based on excess returns over mean and median values.
    
    Parameters:
    dictionary (dict): A dictionary where keys are currency names and values are DataFrames containing currency data.
    mean_returns (float): The mean returns value to compare against.
    median_returns (float): The median returns value to compare against.
    
    Returns:
    dict: The original dictionary with each DataFrame updated to include labels for excess returns over mean and median.
    """
    
    # Iterate over each currency in the dictionary
    for currency in dictionary:
        # Apply the labeling function to each DataFrame in the dictionary
        dictionary[currency] = label_excess_over_mean_median(dictionary[currency], mean_returns, median_returns)
        
    return dictionary

In [16]:
def label_tail_sets(df):
    """
    Label the tail sets of returns based on volatility-adjusted returns within a rolling window.

    Parameters:
    df (pd.DataFrame): DataFrame containing at least a 'Price' column.

    Returns:
    pd.DataFrame: DataFrame with an additional 'tail_sets' column where:
                  - 1 indicates the return is in the upper tail.
                  - -1 indicates the return is in the lower tail.
                  - 0 indicates the return is in the middle range.
    """
    window_in_days = 10
    
    # Make a copy of the DataFrame to avoid modifying the original data
    df = df.copy()
    
    # Initialize the tail_sets column
    df['tail_sets'] = 0
    
    # Loop through the DataFrame to calculate thresholds and label tail sets
    for i in range(window_in_days, len(df)):
        try:
            # Use only past data up to the current index
            past_data = df.iloc[:i].copy()  # Ensure past_data includes current index and is a copy
            
            # Calculate the rolling volatility
            past_data['volatility'] = past_data['Price'].rolling(window=window_in_days).std()
            
            # Calculate the volatility-adjusted return
            past_data['volatility_adjusted_return'] = past_data['Price'].pct_change() / past_data['volatility']
            
            # Drop NaN values that result from rolling calculations
            past_data = past_data.dropna(subset=['volatility_adjusted_return'])
            
            if len(past_data) < window_in_days:
                continue
            
            # Calculate the upper and lower thresholds
            upper_threshold = past_data['volatility_adjusted_return'].quantile(0.75)
            lower_threshold = past_data['volatility_adjusted_return'].quantile(0.25)
            
            # Label the tail sets based on the current row's volatility-adjusted return
            current_return = past_data.iloc[-1]['volatility_adjusted_return']
            if current_return > upper_threshold:
                df.at[df.index[i], 'tail_sets'] = 1
            elif current_return < lower_threshold:
                df.at[df.index[i], 'tail_sets'] = -1
            else:
                df.at[df.index[i], 'tail_sets'] = 0
        except KeyError:
            # Handle the case where the index is not available
            df.at[df.index[i], 'tail_sets'] = 0
    
    # Drop intermediate calculation columns, but keep the original columns
    df = df.drop(columns=['volatility', 'volatility_adjusted_return'], errors='ignore')
    
    return df

In [18]:
def assign_tail_sets(dictionary):
    """
    This function assigns tail set labels to each DataFrame in the given dictionary based on the volatility-adjusted returns.
    
    Parameters:
    dictionary (dict): A dictionary where keys are currency names and values are DataFrames containing price data for each currency.
                       Each DataFrame must contain a 'Price' column.
    
    Returns:
    dict: The original dictionary with each DataFrame updated to include a 'tail_sets' column indicating the tail set labels.
    """
    
    # Iterate over each currency in the dictionary
    for currency in dictionary:
        # Apply the label_tail_sets function to each DataFrame in the dictionary
        dictionary[currency] = label_tail_sets(dictionary[currency])
        
    return dictionary


In [20]:
def trend_scanning(df, max_L=10, threshold_up=1, threshold_down=-1):

    """
    This function scans for trends in the given price data by performing OLS regression over varying look-back periods
    and assigns labels based on the t-value of the regression slope.
    
    Parameters:
    df (DataFrame): A DataFrame containing the 'Price' column with price data to be analyzed.
    max_L (int, optional): The maximum look-back period to consider for trend scanning. Default is 10.
    threshold_up (float, optional): The upper threshold for the t-value to indicate a positive trend. Default is 1.96.
    threshold_down (float, optional): The lower threshold for the t-value to indicate a negative trend. Default is -1.96.
    
    Returns:
    DataFrame: The original DataFrame with an additional 'trend' column indicating the trend labels where:
        1 indicates a positive trend,
        -1 indicates a negative trend,
        0 indicates no trend.
    """
    
    # Extract the price series and get the length of the series
    x = df['Price'].values
    T = len(x)

    # Initialize the labels array
    y = np.zeros(T)

    def ols_regression(x, y):
        """
        Performs OLS regression using the pseudo-inverse method.
        
        Parameters:
        x (array-like): The independent variable values.
        y (array-like): The dependent variable values.
        
        Returns:
        tuple: Regression coefficients (beta) and standard error of the slope (sigma_beta).
        """
        X = np.vstack([np.ones(len(x)), x]).T
        beta = np.linalg.pinv(X.T @ X) @ X.T @ y
        residuals = y - X @ beta
        residual_sum_squares = np.sum(residuals**2)
        total_variance = np.sum((x - np.mean(x))**2)
        
        if len(x) <= 2 or total_variance == 0:
            sigma_beta = np.inf  # To avoid division by zero in t-value computation
        else:
            sigma_beta = np.sqrt(residual_sum_squares / (len(x) - 2) / total_variance)
        
        return beta, sigma_beta

    # Loop over each time point starting from max_L
    for t in range(max_L, T):
        best_t_value = -np.inf
        best_L = 0

        # Try different look-back periods
        for L in range(1, max_L + 1):
            if t - L < 0:
                continue
            x_back = np.arange(L)
            y_back = x[t-L:t]

            # Fit OLS regression
            beta, sigma_beta = ols_regression(x_back, y_back)
            if sigma_beta == 0 or sigma_beta == np.inf:
                t_value = -np.inf  # Handle division by zero or infinite standard deviation
            else:
                t_value = beta[1] / sigma_beta  # t-value for the slope

            # Select the best t-value without looking ahead
            if t_value > best_t_value:
                best_t_value = t_value
                best_L = L

        # Assign label based on the best t-value
        if best_t_value > threshold_up:
            y[t] = 1  # Positive trend
        elif best_t_value < threshold_down:
            y[t] = -1  # Negative trend
        else:
            y[t] = 0  # No significant trend

    # Add the trend labels to the DataFrame
    df['trend'] = y
    return df

In [21]:
def assign_trend_scanning(dictionary):
    """
    This function applies the trend scanning algorithm to each DataFrame in the given dictionary.
    
    Parameters:
    dictionary (dict): A dictionary where keys are currency names and values are DataFrames containing price data for each currency.
                       Each DataFrame must contain a 'Price' column.
    
    Returns:
    dict: The original dictionary with each DataFrame updated to include a 'trend' column indicating the trend labels where:
        1 indicates a positive trend,
        -1 indicates a negative trend,
        0 indicates no trend.
    """
    
    # Iterate over each currency in the dictionary
    for currency in dictionary:
        # Apply the trend scanning function to each DataFrame in the dictionary
        dictionary[currency] = trend_scanning(dictionary[currency])
        
    return dictionary

In [24]:
def calculate_matrix_flags(df, window_size=10, threshold=0.001):
    """
    Calculate matrix flags for a given DataFrame with price data.
    
    Parameters:
        df (pd.DataFrame): DataFrame with a 'Price' column.
        window_size (int): Size of the window to calculate the matrix and flags.
        threshold (float): Percentage change threshold to detect significant price movements.
    
    Returns:
        pd.DataFrame: The modified DataFrame with an additional 'matrix_flag' column.
    """
    
    # Initialize the 'matrix_flag' column in the original DataFrame
    df['matrix_flag'] = 0
    
    # Function to categorize prices into deciles
    def categorize_prices(prices):
        return pd.qcut(prices, q=10, labels=False, duplicates='drop')
    
    # Ensure we have enough rows to perform the calculations
    if len(df) < window_size * 2:
        return df
    
    for i in range(window_size * 2, len(df)):
        try:
            # Get the window of prices
            first_window_prices = df['Price'].iloc[i - window_size * 2:i - window_size]
            second_window_prices = df['Price'].iloc[i - window_size:i]

            # Skip if there are not enough unique values to form deciles
            if len(first_window_prices.unique()) < 10 or len(second_window_prices.unique()) < 10:
                continue
            
            # Categorize the prices into deciles
            first_window_deciles = categorize_prices(first_window_prices)
            second_window_deciles = categorize_prices(second_window_prices)
            
            # Construct the matrix
            matrix = np.zeros((10, 10))
            
            for t in range(window_size):
                first_decile = first_window_deciles.iloc[t]
                second_decile = second_window_deciles.iloc[t]
                
                if first_decile is not None and first_decile < 10 and t < 10:
                    matrix[int(first_decile), t] += 1
                if second_decile is not None and second_decile < 10 and (t + window_size // 2) < 10:
                    matrix[int(second_decile), t + window_size // 2] += 1
            
            # Bull flag template based on the provided image
            bull_flag_template = np.array([
                [0.5, 0, -1, -1, -1, -1, -1, -1, -1, 0],
                [1, 0.5, 0, -0.5, -1, -1, -1, -1, -1, 0],
                [1, 1, 0.5, 0, -0.5, -0.5, -0.5, 0, 0.5, 0.5],
                [0.5, 1, 1, 0.5, 0, -0.5, -0.5, 0, 1, 1],
                [0, 0.5, 1, 1, 0.5, 0, 0, 0, 0.5, 1],
                [0, 0, 0.5, 1, 1, 0.5, 0, 0, 0.5, 1],
                [-0.5, 0, 0.5, 1, 1, 1, 1, 0.5, 1, 1],
                [-0.5, -1, 0, 0.5, 1, 1, 1, 0, 0.5, 1],
                [-1, -1, -1, -0.5, 0, 0.5, 1, 0, 0.5, 1],
                [-1, -1, -1, -1, -1, 0, 0.5, 0.5, -2, -2.5]
            ])
            
            # Check for bull flag by template matching
            match_score = np.sum(matrix * bull_flag_template)
            
            if match_score > threshold * window_size * 10:  # Adjust thresholding logic as needed
                df.loc[df.index[i], 'matrix_flag'] = 1  # Long
            else:
                # Bear flag template is typically the inverse of the bull flag template
                bear_flag_template = np.flipud(np.fliplr(bull_flag_template))  # Inverted template
                
                # Check for bear flag by template matching
                match_score = np.sum(matrix * bear_flag_template)
                
                if match_score > threshold * window_size * 10:  # Adjust thresholding logic as needed
                    df.loc[df.index[i], 'matrix_flag'] = -1  # Short
        except Exception as e:
            print(f"Error at index {i}: {e}")
    
    return df

In [22]:
def assign_matrix_flags(dictionary):
    """
    This function applies the matrix flag calculation algorithm to each DataFrame in the given dictionary.
    
    Parameters:
    dictionary (dict): A dictionary where keys are currency names and values are DataFrames containing data for each currency.
    
    Returns:
    dict: The original dictionary with each DataFrame updated to include the results of the matrix flag calculations.
    """
    
    # Iterate over each currency in the dictionary
    for currency in dictionary:
        # Apply the calculate_matrix_flags function to each DataFrame in the dictionary
        dictionary[currency] = calculate_matrix_flags(dictionary[currency])
        
    return dictionary

In [25]:
def next_period_return_labeling(df, no_trade_threshold=0.001):
    """
    Label each day based on whether the return the next day is positive, negative, or near zero.
    
    Parameters:
        df (pd.DataFrame): DataFrame with a 'Price' column.
        no_trade_threshold (float): Threshold to determine 'no trade' condition for near zero returns.
        
    Returns:
        pd.DataFrame: Original DataFrame with additional 'return' and 'next_day' columns.
    """
    
    # Calculate the next day returns without look-ahead bias
    df['return'] = df['Price'].pct_change().shift(-1)
    
    # Initialize the 'next_day' column
    df['next_period'] = 0  # Default to 'no trade'
    
    # Label the rows based on next day's return
    df.loc[df['return'] > no_trade_threshold, 'next_period'] = 1  # Long
    df.loc[df['return'] < -no_trade_threshold, 'next_period'] = -1  # Short
    
    # Drop the last row as it cannot be labeled (no future data)
    df.drop(df.tail(1).index, inplace=True)
    
    return df

In [26]:
def apply_next_period_return_labeling(dictionary, no_trade_threshold=0.001):
    """
    Function to loop over the currencies and assign next-day return labels to all the currencies.
    
    Parameters:
    dictionary (dictionary): Dictionary with DataFrames of crypto currencies in a given directory.
    no_trade_threshold (float): Threshold to determine 'no trade' condition for near zero returns.
    
    Returns:
    dictionary (dictionary): Dictionary with DataFrames containing labels.
    """
    
    # Loop over currencies
    for currency in dictionary:
        dictionary[currency] = next_period_return_labeling(dictionary[currency], no_trade_threshold)
    
    return dictionary