In [2]:
import ccxt
import pandas as pd
import talib
import numpy as np
from sklearn.metrics import accuracy_score

candle_number_to_calculate = 3

def fetchCryptoData():
    exchange = ccxt.binance()

    symbol = 'BTC/USDT'
    timeframe = '3m'

    # Fetch OHLCV data (public data)
    ohlcv = exchange.fetch_ohlcv(symbol, timeframe,20231001)

    # Convert the data to a DataFrame
    df = pd.DataFrame(ohlcv,columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])

    # Convert timestamp to a human-readable format
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')

    # Drop the integer index column
    df.reset_index(drop=True, inplace=True)

    return df

df = fetchCryptoData()
print(df.loc[39,['close','high']])

close    4315.32
high     4315.32
Name: 39, dtype: object


In [6]:
def scale(x, p):
    min_val = x[-p:].min()
    max_val = x[-p:].max()

    scaled_column = (x - min_val) / (max_val - min_val)

    return scaled_column

def add_talib_indicators(data):
    # RSI
    data['rs'] = talib.RSI(data['close'], timeperiod=28)
    data['rf'] = talib.RSI(data['close'], timeperiod=14)
    # ROC
    data['os'] = talib.ROC(data['close'], timeperiod=28)
    data['of'] = talib.ROC(data['close'], timeperiod=14)

    # CCI
    data['cs'] = talib.CCI(data['high'], data['low'], data['close'], timeperiod=28)
    data['cf'] = talib.CCI(data['high'], data['low'], data['close'], timeperiod=14)

    # MOM
    data['ms'] = scale(talib.MOM(data['close'], timeperiod=28),63) *100
    data['mf'] = scale(talib.MOM(data['close'], timeperiod=14),63) *100

    data['rs'] = np.nan_to_num(data['rs'])
    data['os'] = np.nan_to_num(data['os'])
    data['cs'] = np.nan_to_num(data['cs'])
    data['ms'] = np.nan_to_num(data['ms'])

    data['rf'] = np.nan_to_num(data['rf'])
    data['of'] = np.nan_to_num(data['of'])
    data['cf'] = np.nan_to_num(data['cf'])
    data['mf'] = np.nan_to_num(data['mf'])

    return data

df = add_talib_indicators(df)

In [7]:
def calculate_feature_1_slow(ind,dataframe):

    if ind == 'RSI':
        return dataframe['rs']
    elif ind == 'ROC':
        return dataframe['os']
    elif ind == 'CCI':
        return dataframe['cs']
    elif ind == 'MOM':
        return dataframe['ms']
    else:
        # Assuming avg is a function that calculates the average
        return (dataframe['rs']+dataframe['os']+dataframe['cs']+dataframe['ms'])/4
    

def calculate_feature_2_fast(ind,dataframe):

    if ind == 'RSI':
        return dataframe['rf']
    elif ind == 'ROC':
        return dataframe['of']
    elif ind == 'CCI':
        return dataframe['cf']
    elif ind == 'MOM':
        return dataframe['mf']
    else:
        # Assuming avg is a function that calculates the average
        return (dataframe['rf']+dataframe['of']+dataframe['cf']+dataframe['mf'])/4

def calculate_regression_value(dataframe):
    dataframe['actual_regression'] = -1 #Sell Signal
    condition = (dataframe['close'].shift(-candle_number_to_calculate) - dataframe['close']) > 0
    dataframe.loc[condition, 'actual_regression'] = 1 #Buy Signal
    return dataframe['actual_regression']

df['actual_regression'] = np.nan_to_num(calculate_regression_value(df))
df['feature_slow'] = np.nan_to_num(calculate_feature_1_slow("All",df))
df['feature_fast'] = np.nan_to_num(calculate_feature_2_fast("All",df))
df['feature_combination'] = df['feature_slow'] * df['feature_fast']

Split Data And Train Model

In [8]:
from sklearn.discriminant_analysis import StandardScaler
from sklearn.ensemble import AdaBoostClassifier, RandomForestClassifier, VotingClassifier
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split

def trainModel(df):
    # Generate some sample data
    filter_index = df[(df['feature_slow'] == 0) | (df['feature_fast'] == 0)].index

    X = df.iloc[:-candle_number_to_calculate, df.columns.isin(['feature_slow', 'feature_fast','close','rs'])]
    # Drop X 0 and last 3 candle
    X = X.drop(filter_index)
    X = X.iloc[:, X.columns.isin(['feature_slow', 'feature_fast','close','rs'])]

    y = df.iloc[:-candle_number_to_calculate, df.columns.isin(['actual_regression'])]
    # Drop Y 0 and last 3 candle
    y = y.drop(filter_index)
    y = y.iloc[:, y.columns.isin(['actual_regression'])]

    test_data = df[['feature_slow', 'feature_fast','close','rs']]

    # Convert X_train and X_test to NumPy arrays
    X = np.array(X)
    y = np.ravel(y)
    test_data = np.array(test_data)

    # Standardize features by removing the mean and scaling to unit variance
    scaler = StandardScaler()
    X_std = scaler.fit_transform(X)
    X_test_std = scaler.transform(test_data)

    X_train, X_test, y_train, y_test = train_test_split(X_std, y, test_size=0.1, random_state=42)

    # Create SVM classifier
    svm_classifier = SVC(
                          kernel='rbf',
                          C=100,
                          gamma='auto',
                          random_state=42,
                          probability=True,
                          cache_size=1,
                          )
    
    # Ensemble Model
    rf_model = RandomForestClassifier(n_estimators=100, 
                                      min_samples_leaf=50,
                                      oob_score=True,
                                      random_state=42)

    ensemble_model = VotingClassifier(estimators=[
                                                  ('svm', svm_classifier), 
                                                  ('rf', rf_model),
                                                  ], voting='hard')

    # Train the classifier
    ensemble_model.fit(X_train, y_train)

    # Make predictions on the test set
    y_pred = ensemble_model.predict(X_test_std)
    df['predicted_regression'] = y_pred

    # print(df.iloc[450:500,[14,18]])

    # Calculate accuray of buy and sell
    accuracy = accuracy_score((df['actual_regression']>0).tolist(), (df['predicted_regression']>0).tolist())
    print(f'Average Accuracy : {accuracy}')

    train_accuracy = ensemble_model.score(X_train, y_train)
    test_accuracy = ensemble_model.score(X_test, y_test)
    print(f'Training Accuracy : {train_accuracy} =====',f'Testing Accuracy : {test_accuracy}')

trainModel(df)

Average Accuracy : 0.534
Training Accuracy : 0.6832151300236406 ===== Testing Accuracy : 0.8085106382978723


In [34]:
def volatilifyFilter(dataframe):
    # Define BB windows 
    bb_window = 20
    bb_dev = 2

    # Calculate Bollinger Bands
    upper_band , middle_band, lower_band = talib.BBANDS(dataframe['close'],timeperiod=bb_window,nbdevup=bb_dev,nbdevdn=bb_dev)

    # Calculate ATR

    # Define threshold for atr
    dataframe.loc[
        (
            (dataframe['close'] < lower_band)
        ),
        'volatility_filter'] = 1
    
    dataframe.loc[
        (
            (dataframe['close'] > upper_band)
        ),
        'volatility_filter'] = -1
    
    return dataframe

df = volatilifyFilter(df)
print(df['volatility_filter'].value_counts())

volatility_filter
 1.0    37
-1.0    19
Name: count, dtype: int64


Model Testing

In [586]:
# Second Model
from sklearn.discriminant_analysis import StandardScaler
from sklearn.ensemble import RandomForestClassifier, VotingClassifier
from sklearn.svm import SVC
from sklearn.model_selection import train_test_split

def trainModel(df):
    X = df.iloc[:-candle_number_to_calculate, df.columns.isin(['feature_slow', 'feature_fast'])]
    y = df.iloc[:-candle_number_to_calculate, df.columns.isin(['actual_regression'])]
    test_data = df[['feature_slow', 'feature_fast']]

    # Convert X_train and X_test to NumPy arrays
    X = np.array(X)
    y = np.ravel(y)
    test_data = np.array(test_data)

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)

    # Standardize features by removing the mean and scaling to unit variance
    scaler = StandardScaler()
    X_train_std = scaler.fit_transform(X_train)
    X_test_std = scaler.transform(test_data)

    # Create SVM classifier
    svm_classifier = SVC(kernel='poly',
                        C=100,
                        gamma='auto',
                        random_state=42,
                        probability=True,
                        tol=0.1,
                        cache_size=1,
                        max_iter=100,
                        degree=3
                        )
    
    # Ensemble Model
    rf_model = RandomForestClassifier(n_estimators=100,
                                      oob_score=True,
                                      )

    ensemble_model = VotingClassifier(estimators=[('svm', svm_classifier), ('rf', rf_model)], voting='soft')

    # Train the classifier
    ensemble_model.fit(X_train_std, y_train)

    # Make predictions on the test set
    y_pred = ensemble_model.predict(X_test_std)
    df['predicted_regression'] = y_pred

    # print(df.iloc[0:50,[14,18]])

    # Calculate accuray of buy and sell
    accuracy = accuracy_score((df['actual_regression']>0).tolist(), (df['predicted_regression']>0).tolist())
    print(f'Average Accuracy : {accuracy}')

    train_accuracy = ensemble_model.score(X_train, y_train)
    test_accuracy = ensemble_model.score(X_test, y_test)
    print(f'Training Accuracy : {train_accuracy} =====',f'Testing Accuracy : {test_accuracy}')

trainModel(df)



Average Accuracy : 0.948
Training Accuracy : 0.5861297539149888 ===== Testing Accuracy : 0.6


In [9]:
# def adaptiveTrendFinder(self,dataframe:DataFrame):
#     # Variable Can Modify
#     devMultiplier = 2.0

#     # Calculate Deviation,PersionR,Slope,Intercept
#     stdDev01, pearsonR01, slope01, intercept01 = self.calcDev(self.periods[1],dataframe)
#     stdDev02, pearsonR02, slope02, intercept02 = self.calcDev(self.periods[2],dataframe)
#     stdDev03, pearsonR03, slope03, intercept03 = self.calcDev(self.periods[3],dataframe)
#     stdDev04, pearsonR04, slope04, intercept04 = self.calcDev(self.periods[4],dataframe)
#     stdDev05, pearsonR05, slope05, intercept05 = self.calcDev(self.periods[5],dataframe)
#     stdDev06, pearsonR06, slope06, intercept06 = self.calcDev(self.periods[6],dataframe)
#     stdDev07, pearsonR07, slope07, intercept07 = self.calcDev(self.periods[7],dataframe)
#     stdDev08, pearsonR08, slope08, intercept08 = self.calcDev(self.periods[8],dataframe)
#     stdDev09, pearsonR09, slope09, intercept09 = self.calcDev(self.periods[9],dataframe)
#     stdDev10, pearsonR10, slope10, intercept10 = self.calcDev(self.periods[10],dataframe)
#     stdDev11, pearsonR11, slope11, intercept11 = self.calcDev(self.periods[11],dataframe)
#     stdDev12, pearsonR12, slope12, intercept12 = self.calcDev(self.periods[12],dataframe)
#     stdDev13, pearsonR13, slope13, intercept13 = self.calcDev(self.periods[13],dataframe)
#     stdDev14, pearsonR14, slope14, intercept14 = self.calcDev(self.periods[14],dataframe)
#     stdDev15, pearsonR15, slope15, intercept15 = self.calcDev(self.periods[15],dataframe)
#     stdDev16, pearsonR16, slope16, intercept16 = self.calcDev(self.periods[16],dataframe)
#     stdDev17, pearsonR17, slope17, intercept17 = self.calcDev(self.periods[17],dataframe)
#     stdDev18, pearsonR18, slope18, intercept18 = self.calcDev(self.periods[18],dataframe)
#     stdDev19, pearsonR19, slope19, intercept19 = self.calcDev(self.periods[19],dataframe)

#     # Find the highest Pearson's R
#     # float highestPearsonR = pearsonR01
#     highestPearsonR = max(pearsonR01, pearsonR02, pearsonR03, pearsonR04, pearsonR05, pearsonR06, pearsonR07, pearsonR08, pearsonR09, pearsonR10, pearsonR11, pearsonR12, pearsonR13, pearsonR14, pearsonR15, pearsonR16, pearsonR17, pearsonR18, pearsonR19)

#     # Determine selected length, slope, intercept, and deviations
#     detectedPeriod  = 0
#     detectedSlope   = 0
#     detectedIntrcpt = 0
#     detectedStdDev  = 0

#     if highestPearsonR == pearsonR01:
#         detectedPeriod = self.periods[1]
#         detectedSlope = slope01
#         detectedIntrcpt = intercept01
#         detectedStdDev = stdDev01
#     elif highestPearsonR == pearsonR02:
#         detectedPeriod = self.periods[2] 
#         detectedSlope = slope02
#         detectedIntrcpt = intercept02
#         detectedStdDev = stdDev02
#     elif highestPearsonR == pearsonR03:
#         detectedPeriod = self.periods[3]  
#         detectedSlope = slope03
#         detectedIntrcpt = intercept03
#         detectedStdDev = stdDev03
#     elif highestPearsonR == pearsonR04:
#         detectedPeriod = self.periods[4]  
#         detectedSlope = slope04
#         detectedIntrcpt = intercept04
#         detectedStdDev = stdDev04
#     elif highestPearsonR == pearsonR05:
#         detectedPeriod = self.periods[5]  
#         detectedSlope = slope05
#         detectedIntrcpt = intercept05
#         detectedStdDev = stdDev05
#     elif highestPearsonR == pearsonR06:
#         detectedPeriod = self.periods[6]       
#         detectedSlope = slope06
#         detectedIntrcpt = intercept06
#         detectedStdDev = stdDev06
#     elif highestPearsonR == pearsonR07:
#         detectedPeriod = self.periods[7]      
#         detectedSlope = slope07
#         detectedIntrcpt = intercept07
#         detectedStdDev = stdDev07
#     elif highestPearsonR == pearsonR08:
#         detectedPeriod = self.periods[8]       
#         detectedSlope = slope08
#         detectedIntrcpt = intercept08
#         detectedStdDev = stdDev08
#     elif highestPearsonR == pearsonR09:
#         detectedPeriod = self.periods[9]       
#         detectedSlope = slope09
#         detectedIntrcpt = intercept09
#         detectedStdDev = stdDev09
#     elif highestPearsonR == pearsonR10:
#         detectedPeriod = self.periods[10]
#         detectedSlope = slope10
#         detectedIntrcpt = intercept10
#         detectedStdDev = stdDev10
#     elif highestPearsonR == pearsonR11:
#         detectedPeriod = self.periods[11]
#         detectedSlope = slope11
#         detectedIntrcpt = intercept11
#         detectedStdDev = stdDev11
#     elif highestPearsonR == pearsonR12:
#         detectedPeriod = self.periods[12]
#         detectedSlope = slope12
#         detectedIntrcpt = intercept12
#         detectedStdDev = stdDev12
#     elif highestPearsonR == pearsonR13:
#         detectedPeriod = self.periods[13]
#         detectedSlope = slope13
#         detectedIntrcpt = intercept13
#         detectedStdDev = stdDev13
#     elif highestPearsonR == pearsonR14:
#         detectedPeriod = self.periods[14]
#         detectedSlope = slope14
#         detectedIntrcpt = intercept14
#         detectedStdDev = stdDev14
#     elif highestPearsonR == pearsonR15:
#         detectedPeriod = self.periods[15]
#         detectedSlope = slope15
#         detectedIntrcpt = intercept15
#         detectedStdDev = stdDev15
#     elif highestPearsonR == pearsonR16:
#         detectedPeriod = self.periods[16]
#         detectedSlope = slope16
#         detectedIntrcpt = intercept16
#         detectedStdDev = stdDev16
#     elif highestPearsonR == pearsonR17:
#         detectedPeriod = self.periods[17]
#         detectedSlope = slope17
#         detectedIntrcpt = intercept17
#         detectedStdDev = stdDev17
#     elif highestPearsonR == pearsonR18:
#         detectedPeriod = self.periods[18]
#         detectedSlope = slope18
#         detectedIntrcpt = intercept18
#         detectedStdDev = stdDev18
#     elif highestPearsonR == pearsonR19:
#         detectedPeriod = self.periods[19]
#         detectedSlope = slope19
#         detectedIntrcpt = intercept19
#         detectedStdDev = stdDev19
#     else:
#         # Default case
#         raise Exception("Cannot Find Highest PearsonR") 
    
#     # Calculate start and end price based on detected slope and intercept
#     startPrice = math.exp(detectedIntrcpt + detectedSlope * (detectedPeriod - 1))
#     endPrice = math.exp(detectedIntrcpt)
#     startAtBar = len(dataframe) - detectedPeriod + 1

#     # Calculate Upper Upper Price and Upper End price
#     upperStartPrice = startPrice * math.exp(devMultiplier * detectedStdDev)
#     upperEndPrice   =   endPrice * math.exp(devMultiplier * detectedStdDev)

#     # Calculate Lower Price and Lower End Price
#     lowerStartPrice = startPrice / math.exp(devMultiplier * detectedStdDev)
#     lowerEndPrice =   endPrice / math.exp(devMultiplier * detectedStdDev)

#     # Calculate If Uptrend or Downtrend and how strength is this trend
#     # Also Know how many this trend exist with period
#     # ====== Strategies ======
#     # If EndPrice > StartPrice Uptrend
#     # If EndPrice < StartPrice Downtrend
#     trend_direction = endPrice - startPrice

#     return trend_direction,detectedPeriod,highestPearsonR



In [None]:
# freqtrade download-data --exchange binance --pairs 1000BONK/USDT:USDT --timerange 20240106-20240107 -t 5m