In [5]:
# Step 1 Import
import ccxt as ccxt
import pandas as pd
import pandas_ta as ta
import numpy as np
import math
from numba import njit

In [6]:
# Step 2 Fetch Data
def fetch_ohlcv_info(exchange: ccxt.binance, symbol, timeframe, limit) -> pd.DataFrame:
    # Fetch 10,000 candles in chunks of 1500 candles
    num_candles_to_fetch = limit
    candles_per_request = 1000
    start_index = 0
    dataframe = pd.DataFrame()

    while start_index < num_candles_to_fetch:
        # Determine the end index for this request
        end_index = min(start_index + candles_per_request,
                        num_candles_to_fetch)

        # Fetch candles for this chunk
        candles = exchange.fetch_ohlcv(
            symbol, timeframe, limit=candles_per_request, since=None)

        # Convert candles to DataFrame
        df = pd.DataFrame(candles, columns=[
                          'date', 'open', 'high', 'low', 'close', 'volume'])
        df['date'] = pd.to_datetime(df['date'], unit='ms')
        # Append the candles to the DataFrame
        dataframe = pd.concat([dataframe, df], ignore_index=True)

        # Update start index for the next request
        start_index += candles_per_request
    return dataframe

exchange = ccxt.binance()
symbol = "BTC/USDT:USDT"
df = fetch_ohlcv_info(exchange,symbol,"1m",2000)

In [7]:
from numba import njit
import numpy as np

@njit
def calculate_trend_direction_jit(index, close, periods) -> tuple:
    if index >= periods[-1]:
        devMultiplier = 2.0
        highestPearsonR = -np.inf
        detectedPeriod = 0
        detectedSlope = 0
        detectedIntrcpt = 0
        detectedStdDev = 0
        for period in periods:
            stdDev, pearsonR, slope, intercept = calc_dev_jit(period, close, index)
            if pearsonR > highestPearsonR:
                highestPearsonR = pearsonR
                detectedPeriod = period
                detectedSlope = slope
                detectedIntrcpt = intercept
                detectedStdDev = stdDev
        if highestPearsonR == -np.inf:
            raise Exception("Cannot find highest PearsonR")
        startPrice = np.exp(detectedIntrcpt + detectedSlope * (detectedPeriod - 1))
        endPrice = np.exp(detectedIntrcpt)
        trend_direction = endPrice - startPrice
        return trend_direction, detectedPeriod, highestPearsonR

    return 0, 0, 0


@njit
def calc_dev_jit(length, close, index) -> tuple:
    log_source = close[index + 1 - length: index + 1]
    indices = np.arange(1, length + 1)
    sum_x = np.sum(indices)
    sum_xx = np.sum(indices ** 2)
    sum_y = np.sum(log_source)
    sum_yx = np.sum(indices * log_source)

    denominator = length * sum_xx - sum_x ** 2
    if denominator == 0:
        return 0, 0, 0, 0

    slope = (length * sum_yx - sum_x * sum_y) / denominator
    average = sum_y / length
    intercept = average - (slope * sum_x / length) + slope

    deviations = log_source - intercept - slope * (indices - 1)
    sum_dev = np.sum(deviations ** 2)
    if length <= 1 or sum_dev == 0:
        std_dev = 0
    else:
        std_dev = np.sqrt(sum_dev / (length - 1))

    dxt = log_source - average
    dyt = slope * (indices - 1) - (intercept + slope * (length - 1) * 0.5)
    sum_dxx = np.sum(dxt ** 2)
    sum_dyy = np.sum(dyt ** 2)
    sum_dyx = np.sum(dxt * dyt)

    divisor = sum_dxx * sum_dyy
    if divisor <= 0:
        pearson_r = 0
    else:
        pearson_r = sum_dyx / np.sqrt(divisor)

    return std_dev, pearson_r, slope, intercept

def adaptiveTrendFinder_jit(df: pd.DataFrame, periods):
    close = np.log(df['close'].to_numpy())
    trend_directions = np.zeros(len(close))
    detected_periods = np.zeros(len(close))
    highest_pearson_rs = np.zeros(len(close))
    for i in range(len(close)):
        trend_direction, detected_period, highest_pearson_r = calculate_trend_direction_jit(i, close, periods)
        trend_directions[i] = trend_direction
        detected_periods[i] = detected_period
        highest_pearson_rs[i] = highest_pearson_r
    df['trend_direction'] = -trend_directions
    df['detected_period'] = detected_periods
    df['highest_pearson_r'] = highest_pearson_rs
    return df


In [8]:
print(adaptiveTrendFinder_jit(df,[20,30,40,50]))

                    date      open      high       low     close   volume  \
0    2025-01-21 13:48:00  104766.5  104812.5  104750.1  104784.0  165.432   
1    2025-01-21 13:49:00  104784.1  104805.2  104738.7  104745.5   65.054   
2    2025-01-21 13:50:00  104745.4  104796.4  104710.4  104735.9   94.001   
3    2025-01-21 13:51:00  104734.9  104734.9  104572.7  104572.8   71.986   
4    2025-01-21 13:52:00  104572.7  104635.5  104513.4  104522.6   66.056   
...                  ...       ...       ...       ...       ...      ...   
1995 2025-01-22 06:23:00  105809.0  105850.8  105772.3  105772.4   75.744   
1996 2025-01-22 06:24:00  105772.3  105772.4  105717.2  105743.5   26.123   
1997 2025-01-22 06:25:00  105743.6  105772.4  105743.4  105743.7   20.160   
1998 2025-01-22 06:26:00  105743.7  105743.7  105473.3  105484.9  449.747   
1999 2025-01-22 06:27:00  105484.8  105492.3  105319.1  105372.5  243.354   

      trend_direction  detected_period  highest_pearson_r  
0           -0.