In [1]:
import pandas as pd
import numpy as np
import sqlalchemy
from datetime import datetime
import time
import ta

import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots

from data.get_data import *

from IPython.display import clear_output

import warnings
warnings.filterwarnings('ignore')

%load_ext autoreload
%autoreload 2

In [2]:
from data import db

DB = db.connect_db("database")
DB.get_data("QNT")

Unnamed: 0_level_0,open,high,low,close,volume
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2021-07-29,100.4,156.7,100.4,134.1,564263.578
2021-07-30,134.2,137.7,120.7,129.8,141011.202
2021-07-31,129.7,140.0,123.2,135.0,50786.290
2021-08-01,135.0,179.9,135.0,175.3,299545.806
2021-08-02,175.6,191.7,142.8,156.9,272290.646
...,...,...,...,...,...
2023-12-27,144.3,145.4,136.9,139.2,68822.503
2023-12-28,139.3,144.1,135.5,135.7,53689.563
2023-12-29,135.6,137.6,131.1,134.1,48383.292
2023-12-30,134.0,153.4,131.9,147.7,57198.252


In [3]:
import ta
from feature.indicator import *
from multiAnalysis.volatility import GARCH

SYMBOL = "BTC"
start = "2023-01"
end = "2023-12"

DB = db.connect_db("database")
data = DB.get_data("BTC")
data = data.loc[start : end]

rets = data.close.pct_change().dropna()*100
rets.name = "returns"

ma_1 = 3
ma_2 = 7
ma_3 = 14

atr_ = 24
rsi_ = 14

window = 10

data['returns'] = data.close.pct_change()

data['ma_1'] = ma(data, ma_1)
data['ma_2'] = ma(data, ma_2)
data['ma_3'] = ma(data, ma_3)

data['vol_std'] = rets.rolling(window).std()
data['vol'] = rets.ewm(span = 5).std()

data['sar'], data['sar_down'], data['sar_up'] = sar(data)

data['garch'] = GARCH(rets)

data['r_target'] = data['returns'].rolling(3).mean()
data['target'] = np.where(data['r_target'] > 0, 1, 0)

                       Zero Mean - GARCH Model Results                        
Dep. Variable:                returns   R-squared:                       0.000
Mean Model:                 Zero Mean   Adj. R-squared:                  0.003
Vol Model:                      GARCH   Log-Likelihood:               -816.395
Distribution:                  Normal   AIC:                           1638.79
Method:            Maximum Likelihood   BIC:                           1650.48
                                        No. Observations:                  364
Date:                Fri, Jun 07 2024   Df Residuals:                      364
Time:                        22:34:41   Df Model:                            0
                              Volatility Model                             
                 coef    std err          t      P>|t|     95.0% Conf. Int.
---------------------------------------------------------------------------
omega          3.7937      1.436      2.642  8.241e-03    [  

In [4]:
from utils.plot import *

fig = make_subplots(rows = 2, cols = 1, shared_xaxes = True, row_heights = [0.8, 0.2], vertical_spacing = 0.01)
#shared_xaxes=True, row_heights=[0.6, 0.2, 0.2]
fig.add_trace(
    go.Candlestick(
        x = data.index , open = data.open, close = data.close,
        high = data.high, low = data.low, name = SYMBOL,
        yaxis='y1'
    ),
    col = 1, row = 1
)


add_line(fig, x=data.index, y=data['ma_1'], name='ma_1', color='blue')
add_line(fig, x=data.index, y=data['ma_2'], name='ma_2', color='orange')
add_line(fig, x=data.index, y=data['ma_3'], name='ma_3', color='black')
add_mark(fig, x=data.index, y=data['sar_up'], name='sar_up', color='blue')
add_mark(fig, x=data.index, y=data['sar_down'], name='sar_down', color='black')


add_line(fig, x=data.index, y=data['target'], name='target', color='blue', row = 2 , col = 1)
add_line(fig, x=data.index, y=data['vol'], name='vol', color='blue', row = 2 , col = 1)
add_line(fig, x=data.index, y=data['garch'], name='garch', color='green', row = 2 , col = 1)


fig.update_xaxes(rangeslider_visible = False , col =1, row =1)

fig.update_xaxes(showspikes=True)
fig.update_yaxes(showspikes=True)

#Fig = go.FigureWidget(fig)
fig.update_layout(height = 800 , width =1500,
                  margin=dict(
                        l=5, r=0, b=10, t=30,
                        pad=1
                    ),
                  showlegend=True,
                  )
fig.show()

## Optimiser Triple MM

In [5]:
from sklearn.model_selection import ParameterGrid


In [6]:
MM = {
    'a_low' : [2, 4, 6, 8],
    'b_meduim' : [9, 11, 13, 15],
    'c_high' : [16, 18, 20, 22]
}

#list(ParameterGrid(MM))

# Trend-following

## Channel Breakout

In [7]:
slow_period = 30
fast_period = 14


def calculate_donchian_channels(data, slow_period, fast_period):
    data["slow_high"] = data["high"].rolling(window = slow_period).max()
    data["slow_low"] = data["low"].rolling(window = slow_period).min()
    data["fast_high"] = data["high"].rolling(window = fast_period).max()
    data["fast_low"] = data["low"].rolling(window = fast_period).min()
    
def channel_breakout_strategy(data):
    data["long_entry"] = data["close"] > data["fast_high"]
    data["short_entry"] = data["close"] < data["fast_low"]
    data["long_exit"] = data["close"] < data["slow_low"]
    data["short_exit"] = data["close"] > data["slow_high"]
    

In [8]:
calculate_donchian_channels(data, slow_period, fast_period)

In [9]:
from utils.plot import *

#fig = make_subplots(rows = 2, cols = 1, shared_xaxes = True, row_heights = [0.8, 0.2], vertical_spacing = 0.01)
#shared_xaxes=True, row_heights=[0.6, 0.2, 0.2]
fig.add_trace(
    go.Candlestick(
        x = data.index , open = data.open, close = data.close,
        high = data.high, low = data.low, name = SYMBOL,
        yaxis='y1'
    ),
    col = 1, row = 1
)


add_line(fig, x=data.index, y=data['slow_high'], name='slow_high', color='blue')
add_line(fig, x=data.index, y=data['slow_low'], name='slow_low', color='orange')
add_line(fig, x=data.index, y=data['fast_high'], name='fast_high', color='black')
add_line(fig, x=data.index, y=data['fast_low'], name='fast_low', color='blue')
#add_mark(fig, x=data.index, y=data['fast_low'], name='fast_low', color='blue')


fig.update_xaxes(rangeslider_visible = False , col =1, row =1)

fig.update_xaxes(showspikes=True)
fig.update_yaxes(showspikes=True)

#Fig = go.FigureWidget(fig)
fig.update_layout(height = 800 , width =1500,
                  margin=dict(
                        l=5, r=0, b=10, t=30,
                        pad=1
                    ),
                  showlegend=True,
                  )
fig.show()

## Moving Average

In [10]:
def calculate_AMA_AMATR1(prices, alpha, beta):
    ama_values = []
    channels = []
    trend_directions = []
    
    for i in range(len(prices)):
        if i == 0:
            ama = prices[i]
            upper_channel = ama * (1 + beta)
            lower_channel = ama * (1 - beta)
        else:
            ama = alpha * prices[i] + (1 - alpha) * ama
            upper_channel = upper_channel * (1 + beta)
            lower_channel = lower_channel * (1 - beta)
        
        ama_values.append(ama)
        channels.append((upper_channel, lower_channel))
        
        if prices[i] > upper_channel:
            trend_directions.append("Uptrend")
        elif prices[i] < lower_channel:
            trend_directions.append("Downtrend")
        else:
            trend_directions.append("no trend")
        
        return ama_values, channels, trend_directions
    

In [11]:
def calculate_AMA_AMATR2(prices, alpha_min, alpha_max, gamma, beta_min, beta_max):
    ama_values = []
    channels = []
    trend_directions = []
    signal_to_noise_ratios = []
    
    alpha = alpha_min
    beta = beta_min
    
    for i in range(len(prices)):
        if i == 0:
            ama = prices[i]
        else:
            ama = alpha * prices[i] + (1 - alpha) * ama
        
        upper_channel = (1 + beta) * ama
        lower_channel = (1 - beta) * ama
        
        deviation_plus = alpha * max(0, prices[i] - prices[i - 1]) + (1 - alpha) * deviation_plus
        deviation_minus = alpha * max(0, prices[i - 1] - prices[i]) + (1 - alpha) * deviation_minus
        
        snr = (prices[i] - ama) / ama
        
        if prices[i] > upper_channel:
            trend_directions.append("Uptrend")
        elif prices[i] < lower_channel:
            trend_directions.append("Dowtrend")
        else:
            trend_directions.append("No Trend")
        
        ama_values.append(ama)
        channels.append((upper_channel, lower_channel))
        signal_to_noise_ratios.append(snr)
        
        if trend_directions[-1] == "Uptrend":
            alpha = alpha_min + (alpha_max - alpha_min) * np.arctan(gamma * snr)
            beta = beta_max
        elif trend_directions[-1] == "Downtrend":
            alpha = alpha_max + (alpha_max - alpha_min) * np.arctan(gamma * snr)
            beta = beta_max
        else:
            alpha = alpha_max
            beta = beta_min
        
        return ama_values, channels, trend_directions, signal_to_noise_ratios
    

## Swing Breakout

In [12]:
def calculate_swing_breakout_strategy(prices, alpha, beta, T):
    states = []
    positions = []
    profits = []
    
    state = "INITVOL"
    position = 0
    profit = 0
    rolling_volatility = 0
    
    for t in range(len(prices)):
        if state == "INITVOL":
            if t >= T:
                rolling_volatility = np.std(prices[t - T:t])
                S = prices[t] / (1 + rolling_volatility * alpha/2)
                L = prices[t] * (1 + rolling_volatility * alpha/2)
                state = "INIT"
        elif state == "INIT":
            if prices[t] > L:
                state = "LONG"
                position = 1
                PFL = prices[t] * (1 + rolling_volatility * beta)
            elif prices[t] < S:
                state = "SHORT"
                position = -1
                PFS = prices[t] / (1 + rolling_volatility * beta)
        
        elif state == "LONG":
            if prices[t] > PFL:
                state = "INIT"
                position = 0
                S = prices[t] / (1 + rolling_volatility * alpha/2)
                L = prices[t] * (1 + rolling_volatility * alpha/2)
            elif prices[t] > L:
                state = "LONG"
                position = 1
                PFL = prices[t] * (1 + rolling_volatility * beta)
        
        states.append(state)
        positions.append(position)
        profits.append(profit)
        
    return states, positions, profits

In [13]:
import pandas as pd
import numpy as np
import sqlalchemy

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score
from data import db


SYMBOL = "ETH"
start = "2020"
end = "2024"

DB = db.connect_db("database", interval="1d")
data = DB.get_data(SYMBOL)
data = data.loc[start:end]

# Convert all column names to plain Python strings
data.columns = [str(col) if isinstance(col, sqlalchemy.sql.elements.quoted_name) else col for col in data.columns]
print("Column names and types after conversion:")
for col in data.columns:
    print(f"Column: {col}, Type: {type(col)}")
    

def apply_triple_barrier(prices, events, profit_mult, loss_mult, time_barrier):
    """
    prices: Series des prix
    events: DataFrame avec les colonnes 't1' pour les horizons temporels et 'trgt' pour les seuils
    profit_mult, loss_mult: Multiplicateurs pour définir les seuils de profit et de perte
    time_barrier: Nombre de jours pour la barrière verticale
    """
    # Stocker les temps de toucher pour chaque barrière
    out = events[['t1']].copy(deep=True)
    out['touch_time'] = np.nan
    out['label'] = 0  # Par défaut, la barrière de temps est atteinte sans toucher les barrières de profit/perte
    
    for ix, event in events.iterrows():
        start_price = prices.loc[ix]
        end_date = event['t1']
        target = event['trgt']
        
        if pd.isna(start_price) or pd.isna(target):
            continue
        
        # Définir les seuils
        upper_barrier = start_price * (1 + profit_mult * target)
        lower_barrier = start_price * (1 - loss_mult * target)
        
        # Filtrer les prix dans la fenêtre de temps
        price_sub = prices.loc[ix:end_date]
        
        # Vérifier le toucher des barrières supérieure et inférieure
        upper_touch = price_sub[price_sub >= upper_barrier].index.min()
        lower_touch = price_sub[price_sub <= lower_barrier].index.min()
        
        # Déterminer le premier toucher
        first_touch = min(filter(pd.notna, [upper_touch, lower_touch, end_date]))
        if pd.notna(first_touch):
            out.at[ix, 'touch_time'] = first_touch
            if first_touch == upper_touch:
                out.at[ix, 'label'] = 1
            elif first_touch == lower_touch:
                out.at[ix, 'label'] = -1

    return out


# Calcul de la volatilité quotidienne avec une fenêtre glissante de 10 jours
daily_vol = data['close'].pct_change().rolling(window=10).std()


events = pd.DataFrame(index=data.index)
events['t1'] = events.index + pd.Timedelta(days=10)  # Expiration après 10 jours
events['trgt'] = daily_vol
events


labels = apply_triple_barrier(data['close'], events, profit_mult=2, loss_mult=2, time_barrier=10)
labels['label'].value_counts()

data['Y'] = labels['label']

# Calcul des Moyennes Mobiles
data['SMA_10'] = data['close'].rolling(window=10).mean()
data['SMA_20'] = data['close'].rolling(window=20).mean()

# Calcul du RSI
delta = data['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
data['RSI'] = 100 - (100 / (1 + rs))

# Calcul du MACD
exp1 = data['close'].ewm(span=12, adjust=False).mean()
exp2 = data['close'].ewm(span=26, adjust=False).mean()
data['MACD'] = exp1 - exp2
data['Signal_line'] = data['MACD'].ewm(span=9, adjust=False).mean()

# Calcul du High-Low Range
data['High_Low_Range'] = data['high'] - data['low']

# Calcul du Volume On-Balance (OBV)
obv = (np.sign(data['close'].diff()) * data['volume']).fillna(0).cumsum()
data['OBV'] = obv



data.dropna(inplace=True)

# Supposons que vous avez déjà préparé un DataFrame `labels` avec vos étiquettes
features = data[['SMA_10', 'SMA_20', 'RSI', 'MACD', 'Signal_line', 'High_Low_Range', 'OBV']]
labels = data['Y']
# Séparation en ensembles d'entraînement et de test
X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size=0.2, random_state=42)

# Normalisation des données
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)


from sklearn.metrics import accuracy_score

model = RandomForestClassifier()
model.fit(X_train_scaled, y_train)

# Prédiction et évaluation
y_pred = model.predict(X_test_scaled)
y_pred_proba = model.predict_proba(X_test_scaled)

print("Accuracy:", accuracy_score(y_test, y_pred))
print("Classification Report:")
print(classification_report(y_test, y_pred))


Column names and types after conversion:
Column: open, Type: <class 'str'>
Column: high, Type: <class 'str'>
Column: low, Type: <class 'str'>
Column: close, Type: <class 'str'>
Column: volume, Type: <class 'str'>
Accuracy: 0.726643598615917
Classification Report:
              precision    recall  f1-score   support

          -1       0.76      0.74      0.75        77
           0       0.63      0.64      0.63        81
           1       0.77      0.77      0.77       131

    accuracy                           0.73       289
   macro avg       0.72      0.72      0.72       289
weighted avg       0.73      0.73      0.73       289

