In [1]:
!pip install yfinance ta

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
import pandas as pd, numpy as np
import itertools
import seaborn as sns
import yfinance as yf

from copy import copy
import statistics as stats
import math
from functools import reduce

from ta.volume import MFIIndicator
from ta.volatility import AverageTrueRange
from ta.trend import STCIndicator
from ta.trend import EMAIndicator

# Génération du fichier source


In [3]:
ndx = yf.Ticker("^NDX")
df_historical_data = ndx.history(interval="1d", period="max")
df_historical_data.drop(columns=["Dividends","Stock Splits"], inplace=True)
df_historical_data.reset_index(inplace=True)

In [4]:
# Test si aucune ligne manquante
test_list = [champ == 0 for champ in df_historical_data.isnull().sum()]

# Si toutes les colonnes sont True, résultat = True
notnull = all(i for i in test_list)
print("Aucune ligne vide détectée, pour l'ensemble des colonnes : ", notnull)

Aucune ligne vide détectée, pour l'ensemble des colonnes :  True


In [5]:
df_historical_data["Open"] = df_historical_data.Open.apply(lambda x: round(x,2))
df_historical_data["High"] = df_historical_data.High.apply(lambda x: round(x,2))
df_historical_data["Low"] = df_historical_data.Low.apply(lambda x: round(x,2))
df_historical_data["Close"] = df_historical_data.Close.apply(lambda x: round(x,2))

In [6]:
df_historical_data.tail(1)

Unnamed: 0,Date,Open,High,Low,Close,Volume
9474,2023-05-04 00:00:00-04:00,13014.1,13064.02,12938.45,12982.48,930334883


In [7]:
df_historical_data.dtypes

Date      datetime64[ns, America/New_York]
Open                               float64
High                               float64
Low                                float64
Close                              float64
Volume                               int64
dtype: object

# Génération des combinaisons de paramètres

In [8]:
l_ema = [e for e in range(40,88,2)]
l_AT_period = [p for p in range(4,42,2)]
l_STC_length = [l for l in range(98,132,2)]
l_STC_slow = [s for s in range(98,132,2)]

In [9]:
all_params = [l_ema, l_AT_period, l_STC_length, l_STC_slow]

In [10]:
combinaisons = list(itertools.product(*all_params))

In [11]:
print("Le nombre de combinaisons possibles est : ",len(combinaisons))

Le nombre de combinaisons possibles est :  131784


# Fonctions

## Indicateurs Techniques

#### Alphatrend

In [12]:
# Trend indicator, équivalent de l'affichage couleur
def trend_indicator(trend):
    if trend > 0 :
        # Uptrend
        x = 1
    elif trend < 0 :
        # Downtrend
        x = -1
    else :
        # Range
        x = 0
    return x

In [13]:
# Defintion fonction
def generate_alphatrend(df_in, mfi_p, mfi_seuil, atr_l, m):
    '''Paramètres d'entrée : longueur MFI, longueur ATR, multiplier
    Retourne les colonnes Alphatrend, Alphatrend +2, Trend (position AT1 / AT2)
    :mfi_p = période MFI servant à délimiter up/down de l'alphatrend
    :mfi_seuil = période MFI pour recherche crossover, détermine uptrend ou downtrend'''

    df = df_in.copy()

    # Colonnes MFI
    s_mfi = MFIIndicator(high=df.High, low=df.Low, close=df.Close, volume=df.Volume, window=mfi_p).money_flow_index()
    df["MFI_ref"] = s_mfi

    # Colonne ATR
    s_atr = AverageTrueRange(high=df.High, low=df.Low, close=df.Close, window=atr_l).average_true_range()
    df["ATR"] = s_atr

    # Lignes UpT et DownT
    df["UpT_support"] = df["Low"] - df["ATR"] * m
    df["DownT_support"] = df["High"] + df["ATR"] * m

    # Suppression des lignes sans signal, en début de DataFrame
    df.dropna(inplace=True)
    df.reset_index(drop=True, inplace=True)

    # ===============================================
    # Calcul Alphatrend, en tant que série
    
    Alphatrend = [0]

    for i in range (1, df.shape[0]):
        # Cas Uptrend
        if df.at[i,"MFI_ref"] >= mfi_seuil :
            if df.at[i,"UpT_support"] < Alphatrend[-1] :
                # Flat
                Alphatrend.append(Alphatrend[-1])
            else :
                # Trailing stop loss Up
                Alphatrend.append(df.at[i,"UpT_support"])

        # Cas Downtrend, MFI < 50
        else :
            if df.at[i,"DownT_support"] > Alphatrend[-1] :
                # Flat
                Alphatrend.append(Alphatrend[-1])
            else :
                # Trailing stop loss Down
                Alphatrend.append(df.at[i,"DownT_support"])

    # ===============================================
    # Ajout des lignes k1 et k2 en tant que colonnes
    
    if df.shape[0] == len(Alphatrend):
        df["Alphatrend_k1"] = pd.Series(Alphatrend).apply(lambda x: round(x,2))
        # Ligne k2 décalée de 2j
        Alphatrend2 = df["Alphatrend_k1"].shift(periods=2, fill_value=0)
        df["Alphatrend_k2"] = pd.Series(Alphatrend2).apply(lambda x: round(x,2))
        # Trend
        df["Trend"] = df.Alphatrend_k1 - df.Alphatrend_k2
        df["Trend"] = df["Trend"].apply(trend_indicator)
    else :
        print("Erreur lors de la génération des lignes Alphatrend")

    # ===============================================
    # Génération des signaux Achat / Vente

    # On isole tous les index non neutres, où AT1 != AT2, à la hausse (1) comme à la baisse (-1)
    s_trend = df["Trend"].loc[df["Trend"]!=0]
    s_trend_diff = s_trend - s_trend.shift(1)

    buy_signal_indexes = s_trend_diff[s_trend_diff == 2].index
    sell_signal_indexes = s_trend_diff[s_trend_diff == -2].index

    df["Signal"] = 0
    df.loc[buy_signal_indexes,"Signal"] = 1
    df.loc[sell_signal_indexes,"Signal"] = -1

    # ===============================================
    # Sélection des colonnes suffisantes
    df = df[["Date","Alphatrend_k1","Alphatrend_k2","Trend","Signal"]]
    
    return df

#### STC & EMA

In [14]:
def generate_STC_and_EMA(df_in, stc_length, fast_length, slow_length, ema_period):
  
  df = df_in[["Date","Close"]].copy()

  s_stc = STCIndicator(close=df.Close, window_slow=slow_length, window_fast=fast_length, cycle=stc_length).stc()
  s_ema = EMAIndicator(close=df.Close, window=ema_period).ema_indicator()

  df["STC"] = round(s_stc,2)
  df["EMA"] = round(s_ema,2)

  df.drop(columns=["Close"], inplace=True)

  return df

#### ATR sortie & Merge tous indicateurs techniques

In [15]:
def merge_technical_indicators(df_in, atr_l, df1, df2, date_min="1998-01-01"):
  ''' Fusionne les DataFrames d'indicateurs techniques, 
  ajoute également date_min au format 'yyyy-mm-dd' pour fixer le début du Backtesting'''

  df_essentials = df_in.copy()

  # Ajout de la colonne ATR qui servira plus tard dans le calcul de la sortie.
  s_atr = AverageTrueRange(high=df_essentials.High, low=df_essentials.Low, close=df_essentials.Close, window=atr_l).average_true_range()
  df_essentials["ATR"] = pd.Series(s_atr).apply(lambda x: round(x,2))
  
  # Réduction au strict nécessaire pour les colonnes
  df_essentials = df_essentials[["Date","Open","Close","ATR"]].copy()

  # Merge des 3
  data_frames = [df_essentials, df1, df2]
  df_merged = reduce(lambda  left,right: pd.merge(left,right, on=['Date'], how='left'), data_frames)

  # Réduction de la fenêtre de tests à partir de la date_min
  df_merged = df_merged.loc[df_merged["Date"] >= date_min]

  df_merged.reset_index(drop=True,inplace=True)

  return df_merged


## Entrées / Sorties 

#### Détection des entrées

Stratégie :<br>
<li>Entreée : Buy signal + Prix > EMA + STC < seuil(25)</li>
<li>Sortie : Sell signal + Prix < EMA + STC > seuil(75)</li>

#### Sorties, valeurs & aggrégation

In [16]:
def generate_exit_vars(df_in, entry_index, atr_sl, rr_ratio):
  ''' Obtient l'Open de l'index d'entrée.
  Puis génère la valeur du Stop Loss et Take Profit'''

  df = df_in.copy()

  valeur_entree = df.at[entry_index,"Open"]
  date_entree = df.at[entry_index,"Date"]
  atr_reference = df.at[entry_index,"ATR"]

  stop_loss = valeur_entree + atr_sl * atr_reference
  take_profit = valeur_entree - rr_ratio * (atr_sl * atr_reference)

  return date_entree, valeur_entree, stop_loss, take_profit

In [17]:
def return_SL_or_TP_index(df_in, entry_index, stop_loss, take_profit) :
  '''Serie des valeurs close entre entrée et fin du DataFrame.
  Si Close > Stop loss, dans le cas d'un Short, alors touché.
  On cherche alors l'index de la première valeur >=0, si il existe'''

  df = df_in.copy()

  last_line = max(df.index)

  # Recherche index Stop Loss, ou index dernière ligne df
  close_series = (df.loc[entry_index:last_line,"Close"] - stop_loss) >= 0
  if len(close_series[ close_series == True ]) > 0 :
    sl_index = close_series[ close_series == True ].index[0]
  else :
    # Pour la dernière entrée, si rien n'est touché, alors la dernière ligne fera office de sortie
    sl_index = max(df.index)

  # Recherche indexTake Profit
  close_series = (take_profit - df.loc[entry_index:last_line,"Close"]) >= 0
  if len(close_series[ close_series == True ]) > 0 :
    tp_index = close_series[ close_series == True ].index[0]
  else :
    tp_index = max(df.index)
 
  # Sélection du 1er index touché : Stop Loss ou Take Profit
  lowest_index = min(sl_index, tp_index)
  
  # Pointeur des valeurs Date sortie & Prix de clôture dans le DataFrame indicateurs techniques
  exit_date = df.at[lowest_index,"Date"]
  exit_value = df.at[lowest_index,"Close"]

  return exit_date, exit_value

# Strategy as Class

In [30]:
class Strat_AT_STC_EMA:
  
  def __init__(self, p_ema=200, p_AT_m=1, p_AT_l=14, p_AT_mfi_l = 14, p_AT_mfi_s = 50, p_STC_l=80, p_STC_slow_l=50, p_STC_fast_l=27, p_STC_b=25, p_STC_h=75, p_ATR_SL_l = 14, p_ATR_SL = 2, p_RR_ratio = 3, p_leverage=1):
    self.ema_l = p_ema
    self.at_m = p_AT_m
    self.at_l = p_AT_l
    self.at_mfi_l = p_AT_mfi_l
    self.at_mfi_s = p_AT_mfi_s
    self.stc_l = p_STC_l
    self.stc_s_l = p_STC_slow_l
    self.stc_f_l = p_STC_fast_l
    self.stc_seuil_b = p_STC_b
    self.stc_seuil_h = p_STC_h
    self.ATR_SL_l = p_ATR_SL_l
    self.ATR_SL = p_ATR_SL
    self.RR_ratio = p_RR_ratio
    self.leverage = p_leverage

  
  def make_technical_indicators(self, df_source):
    df_AT = generate_alphatrend(df_source, mfi_p=self.at_mfi_l, mfi_seuil=self.at_mfi_s, atr_l=self.at_l, m=self.at_m)
    df_STC_EMA = generate_STC_and_EMA(df_source, stc_length=self.stc_l, fast_length=self.stc_f_l, slow_length=self.stc_s_l, ema_period=self.ema_l)
    df_Technical_Indicators = merge_technical_indicators(df_source, self.ATR_SL_l, df_AT, df_STC_EMA)
    return df_Technical_Indicators


  def get_entries_signals(self, df_in):
    ''' Nécessite en entrée le DataFrame avec indicateurs techniques.
    L'enrichit avec signaux Entrée Long (1) et Entrée Short (-1).'''

    df_IT = df_in.copy()

    # Valeur 3 pour signaux d'entrée valides
    df_IT["Buy_entry"] = np.sign(df_IT.Close - df_IT.EMA) + df_IT.Signal + np.sign(self.stc_seuil_b - df_IT.STC)
    # Valeur -3 pour signaux d'entrée valides
    # Attention / par deux signaux négatifs -> positif, d'où l'inversion sur un seul champ
    df_IT["Sell_entry"] = np.sign(df_IT.Close - df_IT.EMA) + df_IT.Signal + np.sign(self.stc_seuil_h - df_IT.STC)

    # Conversion en np array
    arr_buy_entry = df_IT["Buy_entry"].to_numpy()
    # np.where(condition, vrai, sinon)
    df_IT["Buy_entry"] = np.where(arr_buy_entry==3, 1, 0)

    arr_sell_entry = df_IT["Sell_entry"].to_numpy()
    df_IT["Sell_entry"] = np.where(arr_sell_entry==-3.0, -1, 0)
    
    # Agrégation des deux types de signaux.
    df_IT["Entry"] = df_IT["Sell_entry"] + df_IT["Buy_entry"]
    df_IT.drop(columns=["Buy_entry","Sell_entry"], inplace=True)

    return df_IT

  
  def apply_short_strategy(self, df_entries):
    ''' Entrée : DataFrame avec indicateurs techniques généré par get_entries_signals
    Sortie : DataFrame avec entrée - sortie, valeur & date'''

    df = df_entries.copy()
    d_entrees_sorties = {
        "date_entree" : [],
        "valeur_entree" : [],
        "date_sortie" : [],
        "valeur_sortie" : []
    }

    # Liste des entrées : -1 pour signal Short
    short_entries_indexes = df.loc[ df["Entry"]==-1 ].index
    # Ajout d'une unité pour entrée le lendemain du signal confirmé et clos
    short_entries_indexes += 1
    short_entries_indexes = short_entries_indexes.to_list()

    # Application des deux fonctions précédentes pour recherche Date + Valeur, d'entrée et sortie
    for entry in short_entries_indexes:
      # Calcul stop loss, take profit, et stocke Date + Open correspondants à l'index d'entrée passé dans la fonction
      entry_date, entry_price, sl, tp = generate_exit_vars(df, entry, self.ATR_SL, self.RR_ratio)
      # Détetion de Date + Close de la sortie, identifée avec Take Profit et stop loss trouvés ligne précédente
      exit_date, exit_price = return_SL_or_TP_index(df, entry, sl, tp)
      
      # Ajout des résultats dans le dictionnaire
      d_entrees_sorties["date_entree"].append(entry_date)
      d_entrees_sorties["valeur_entree"].append(entry_price)
      d_entrees_sorties["date_sortie"].append(exit_date)
      d_entrees_sorties["valeur_sortie"].append(exit_price)
    
    df_es = pd.DataFrame(d_entrees_sorties)
    return df_es
  

  def performance_metrics_short(self, df_es):
    ''' Sur la base du DataFrame Entrées/Sorties, génère les performances pour chaque trade, l'équity curve base 1000,
    le Win Rate, Expectancy Ratio, Expectancy, Profit Factor, Gain & Perte moyenne, et copie l'état des paramètres'''

    df = df_es.copy()

    # Performance en % pour chaque trade pris (par ligne). Signe - car stratégie Short
    s_perf = round( -((df["valeur_sortie"]-df["valeur_entree"])/df["valeur_entree"]), 3)
    a_perf = np.array(s_perf)

    # Win Rate
    count_won = len( a_perf[a_perf > 0] )
    count_lost = len(a_perf) - count_won
    win_rate = round( count_won/len(a_perf) ,2)

    # Reward-to-Risk Ratio X Win Ratio - Loss Ratio = Expectancy Ratio
    expectancy_ratio = round( self.RR_ratio * count_won/len(a_perf) - count_lost/len(a_perf) ,2)

    # Equity Curve, base 1000
    l_perf = s_perf.to_list()
    perf_nette_b1000 = [round( (999*(1+x*self.leverage)-1000)*0.99 ,2) for x in l_perf]

    # Gain & perte moyenne (arrondi entier)
    arr_b1000 = np.array(perf_nette_b1000)
    moyenne_gains_nets = int( np.mean(arr_b1000[ arr_b1000>0 ]) )
    moyenne_pertes_nettes = int( np.mean(arr_b1000[ arr_b1000<0 ]) )
    
    # Expectancy (arrondi entier)
    expectancy = int( (count_won/len(a_perf) * moyenne_gains_nets) - (count_lost/len(a_perf) * moyenne_pertes_nettes) )

    # Profit Factor : sommes des gains / somme des pertes
    profit_factor = round( np.sum(arr_b1000[ arr_b1000>0 ]) / np.sum( np.abs(arr_b1000[ arr_b1000<0 ]) ) ,2)

    # Tous les attributs de l'object au moment du test
    #d_attr = x.__dict__
    d_attr = self.__dict__

    row = [win_rate, moyenne_gains_nets, moyenne_pertes_nettes, expectancy, expectancy_ratio, profit_factor, perf_nette_b1000, d_attr]
    return row

# Test

In [31]:
x = Strat_AT_STC_EMA()

#### Comparaison avec notebook backtesting
Valeurs pour short :
<li> AT multiplier = 0.2</li>
<li> AT ATR lenght = 13</li>
<li>STC Length = 130</li>
<li>STC Fast Length = 25</li>
<li>STC Slow Length = 125</li>
<li>EMA length = 70</li>

In [32]:
# Mise à jour des paramètres de l'objet
x.at_m = 0.2
x.at_l = 13
x.stc_l = 130
x.stc_f_l = 25
x.stc_s_l = 125
x.ema_l = 70

In [33]:
%time
df_indicateurs_techniques = x.make_technical_indicators(df_historical_data)
df_signaux = x.get_entries_signals(df_indicateurs_techniques)
df_entrees_sorties = x.apply_short_strategy(df_signaux)

CPU times: user 5 µs, sys: 0 ns, total: 5 µs
Wall time: 11.7 µs


In [34]:
%time
ligne_resultat = x.performance_metrics_short(df_entrees_sorties)

CPU times: user 4 µs, sys: 0 ns, total: 4 µs
Wall time: 7.63 µs


In [35]:
ligne_resultat

[0.41,
 208,
 -48,
 113,
 0.64,
 2.98,
 [305.6,
  308.57,
  301.65,
  259.12,
  223.52,
  -75.17,
  -80.11,
  -24.73,
  -81.1,
  -48.46,
  -43.52,
  -27.69,
  -34.62,
  -42.53,
  -34.62,
  105.82,
  -45.5,
  85.05,
  156.26,
  130.55,
  -49.45,
  -42.53],
 {'ema_l': 70,
  'at_m': 0.2,
  'at_l': 13,
  'at_mfi_l': 14,
  'at_mfi_s': 50,
  'stc_l': 130,
  'stc_s_l': 125,
  'stc_f_l': 25,
  'stc_seuil_b': 25,
  'stc_seuil_h': 75,
  'ATR_SL_l': 14,
  'ATR_SL': 2,
  'RR_ratio': 3,
  'leverage': 1}]