In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
import mplfinance as mpf
from datetime import timedelta

def performance_report_gen(tradelist):
    res=pd.DataFrame()
    d = {"Profit": sum(tradelist),
         "MaxDD" : max_draw_down(tradelist),
         "Operations":operation_number(tradelist),
         "AverageTrade": avg_trade(tradelist),
         "ProfitFactor": profit_factor(tradelist),
         "PctWin": percent_win(tradelist),
         "Kestner_Ratio": kestner_ratio_daily(tradelist)}
    res=pd.DataFrame(data=d,index=[0])
    return res

def kestner_ratio_daily(operations):

    import numpy as np
    import matplotlib.pyplot as plt 
    from scipy import stats
    
    if len(operations)>3:
        daily_operations = operations
        daily_equity = daily_operations.cumsum()
        index = np.array(np.arange(1,daily_operations.count() + 1))

        x = index
        y = daily_equity
        gradient, intercept, r_value, p_value, std_err = stats.linregress(x,y)

        if std_err != 0 and len(index) > 0:
            return round(gradient / (std_err * len(index)),2)
        else:
            return np.inf
    else:
        return np.inf
    
def performance_report_fast(tradelist,
                       initial_capital,
                       risk_free_rate,
                       interactive):
    print("*************************************************************************")
    print("*** Performance Report Fast - Copyright 2020 - by Gandalf Project R&D ***")
    print("*************************************************************************")
    if tradelist.empty:
        print("")
        print("Nessuna operazione registrata!")
        return
    else: 
        print("")
        print("Compound Annual Growth Rate CAGR: " + str(cagr(tradelist.operations,initial_capital)) + " (capital = " + str(initial_capital) + ")")
        print("Annual Return: " + str(annual_return(tradelist.operations,initial_capital)) + " (capital = " + str(initial_capital) + ")")
        print("")
        print("Calmar Ratio (yearly):", calmar_ratio(tradelist.operations))
        print("Sharpe Ratio: " + str(sharpe_ratio(tradelist.operations, initial_capital, risk_free_rate)) + " (capital = " + str(initial_capital) + ", risk free rate = " + str(risk_free_rate) + ")")
        print("Sortino Ratio: " + str(sortino_ratio(tradelist.operations, initial_capital, risk_free_rate / 12)) + " (capital = " + str(initial_capital) + ", risk free rate = " + str(risk_free_rate / 12) + ")")
        print("Omega Ratio: " + str(omega_ratio(tradelist.operations,100)) + " (threshold = 100)") 
        print("Kestner Ratio:", kestner_ratio(tradelist.operations))
        print("")
        print("Profit:                  ", int(tradelist.operations.sum()))
        print("Operations:              ", operation_number(tradelist.operations))
        print("Average Trade:           ", avg_trade(tradelist.operations))
        print("")
        print("Profit Factor:           ", profit_factor(tradelist.operations))
        print("Gross Profit:            ", gross_profit(tradelist.operations))
        print("Gross Loss:              ", gross_loss(tradelist.operations))
        print("")
        print("Percent Winning Trades:  ", percent_win(tradelist.operations))
        print("Percent Losing Trades:   ", round(100 - percent_win(tradelist.operations),2))
        print("Reward Risk Ratio:       ", reward_risk_ratio(tradelist.operations))
        print("")
        print("Max Gain:                ", max_gain(tradelist.operations), " in date ", max_gain_date(tradelist.operations))
        print("Average Gain:            ", avg_gain(tradelist.operations))
        print("Max Loss:                ", max_loss(tradelist.operations), " in date ", max_loss_date(tradelist.operations))
        print("Average Loss:            ", avg_loss(tradelist.operations))
        print("")
        print("Avg Delay Between Peaks: ", avg_delay_between_peaks(tradelist.operations.cumsum()))
        print("Max Delay Between Peaks: ", max_delay_between_peaks(tradelist.operations.cumsum()))
        print("")
        print("Trades Standard Deviation: ", tradelist.operations.std())
        print("Equity Standard Deviation: ", tradelist.operations.cumsum().std())
        print("")
        print("Avg Open Draw Down:      ", avgdrawdown_nozero(tradelist.operations.cumsum()))
        print("Max Open Draw Down:      ", max_draw_down(tradelist.operations.cumsum()))
        print("")
        print("Avg Closed Draw Down:    ", avgdrawdown_nozero(tradelist.operations.cumsum()))
        print("Max Closed Draw Down:    ", max_draw_down(tradelist.operations.cumsum()))
        print("")
        print("Draw Down Statistics: ", drawdown_statistics(tradelist.operations.cumsum()))
        print("")
        
        print("Operations Statistics:\n")

        if interactive == False:
            plot_equity(tradelist.operations.cumsum(),"green")
            plot_equity_price(dataset, tradelist.operations.cumsum())
            plot_drawdown(tradelist.operations.cumsum(),"red")
            plot_annual_histogram(tradelist.operations)
            plot_monthly_bias_histogram(tradelist.operations)
            plot_equity_heatmap(tradelist.operations,False)

        else:
            plot_equity_interactive(tradelist.operations.cumsum(),"green")
            plot_drawdown_interactive(tradelist.operations.cumsum())
            plot_annual_histogram_interactive(tradelist.operations)
            plot_monthly_bias_histogram_interactive(tradelist.operations)
            plot_equity_heatmap_interactive(tradelist.operations,False)

        return

@jit(nopython=True)
def engine(pct,pct_total,ptn_body,ptn_range,
           np_open, np_high, np_low, np_close, direzione,lookback,n_days):
    
    gain_loss=[]
    ls_date=[]
    conta=0
    for i in (range(len(np_open)-n_days)):
        if i >lookback:
            op=np_open[i-lookback:i]
            hi=np_high[i-lookback:i]
            lo=np_low[i-lookback:i]
            cl=np_close[i-lookback:i]

            a=cl-op
            body=(a - np.min(a))/np.ptp(a)  ###  2.*(a - np.min(a))/np.ptp(a)-1 ### per 1 / -1
            
            a=hi-lo
            range_=(a - np.min(a))/np.ptp(a)
     
            diff_1=body-ptn_body
            diff_2=range_-ptn_range
            diff_1=np.where((diff_1<pct)&(diff_1>-pct),1,diff_1)
            diff_1=np.where(diff_1!=1,0,diff_1)
            diff_2=np.where((diff_2<pct)&(diff_2>-pct),1,diff_2)
            diff_2=np.where(diff_2!=1,0,diff_2)
            if (my_sum(diff_1)>=pct_total)&(my_sum(diff_2)>=pct_total)&(direzione=="UP"):
                tmp_ls=[]
                #for n in range(1,n_days+1):
                gl_1=np_open[i+n_days+1] - np_open[i+1]
                tmp_ls.append(gl_1)
                gain_loss.append(tmp_ls)
                ls_date.append(i)
            if (my_sum(diff_1)>=pct_total)&(my_sum(diff_2)>=pct_total)&(direzione=="DOWN"):
                tmp_ls=[]
                #for n in range(1,n_days+1):
                gl_1=np_open[i+1] - np_open[i+n_days+1] 
                tmp_ls.append(gl_1)
                gain_loss.append(tmp_ls)
                ls_date.append(i)
    return gain_loss , ls_date

In [None]:
from numba import jit

@jit(nopython=True)
def my_sum(d):
    total = 0.0
    for valore in d:
        total += valore
    return total


from numba import jit
@jit(nopython=True)
def engine(pct,pct_total,ptn_body,ptn_range,
           np_open, np_high, np_low, np_close, direzione,lookback,n_days):
    
    gain_loss=[]
    ls_date=[]
    conta=0
    mp=np.zeros(len(np_open))
    for i in (range(len(np_open)-n_days)):
        if i >lookback:
            op=np_open[i-lookback:i]
            hi=np_high[i-lookback:i]
            lo=np_low[i-lookback:i]
            cl=np_close[i-lookback:i]

            a=cl-op
            body=(a - np.min(a))/np.ptp(a)  ###  2.*(a - np.min(a))/np.ptp(a)-1 ### per 1 / -1
            
            a=hi-lo
            range_=(a - np.min(a))/np.ptp(a)
     
            diff_1=body-ptn_body
            diff_2=range_-ptn_range
            diff_1=np.where((diff_1<pct)&(diff_1>-pct),1,diff_1)
            diff_1=np.where(diff_1!=1,0,diff_1)
            diff_2=np.where((diff_2<pct)&(diff_2>-pct),1,diff_2)
            diff_2=np.where(diff_2!=1,0,diff_2)
            if (my_sum(diff_1)>=pct_total)&(my_sum(diff_2)>=pct_total)&(direzione=="UP"):
                tmp_ls=[]
                for n in range(1,n_days+1):
                    gl_1=np_open[i+n+1] - np_open[i+1]
                    tmp_ls.append(gl_1)
                gain_loss.append(tmp_ls)
                ls_date.append(i)
            if (my_sum(diff_1)>=pct_total)&(my_sum(diff_2)>=pct_total)&(direzione=="DOWN"):
                tmp_ls=[]
                for n in range(1,n_days+1):
                    gl_1=np_open[i+1] - np_open[i+n+1] 
                    tmp_ls.append(gl_1)
                gain_loss.append(tmp_ls)
                ls_date.append(i)
    return gain_loss , ls_date


@jit(nopython=True)
def search_pattern(np_open,np_high,np_low,np_close,n_days,lookback,fraction_movement):
    
    down_ls=[]
    up_ls=[]
    position_down=[]
    position_up=[]
    for i in (range(len(np_open))):
        op=np_open[i-lookback:i]
        hi=np_high[i-lookback:i]
        lo=np_low[i-lookback:i]
        cl=np_close[i-lookback:i]

        try :
            for n in range(1,n_days+1): 
                if  np_open[i+1] - np_open[i+n+1] >= fraction_movement*np_open[i+1]:
                    if i >=lookback :
                        a=cl-op
                        body=(a - np.min(a))/np.ptp(a)
                        a=hi-lo
                        range_=(a - np.min(a))/np.ptp(a)
                        down_ls.append([body,range_])
                    #print('Down',i,n)
                    position_down.append([i,n])
                    break
                elif np_open[i+n+1] - np_open[i+1] >= fraction_movement*np_open[i+1] :
                    if i >lookback :                    
                        a=cl-op
                        body=(a - np.min(a))/np.ptp(a)
                        a=hi-lo
                        range_=(a - np.min(a))/np.ptp(a)
                        up_ls.append([body,range_])
                    #print('Up',i,n)
                    position_up.append([i,n])
                    break
        except :
            print(i)
            pass
    return up_ls , down_ls , position_up , position_down

   
def generate_signal(dataset,lookback,pct_accettata,fraction_movement,n_patterny,direzione):
    
    if direzione == "long":

        pct=round(1-float((1/100)*pct_accettata),1)
        pct_total=int((lookback/100)*pct_accettata)

        ptn_body=up_ls[n_patterny][0]
        ptn_range=up_ls[n_patterny][1]

        data=dataset.copy()
        data["signal"]=0

        for e in tqdm(range(len(data)-lookback)):

            a=data.close[e:e+lookback].values-data.open[e:e+lookback].values
            body=(a - np.min(a))/np.ptp(a)  

            a=data.high[e:e+lookback].values-data.low[e:e+lookback].values
            range_=(a - np.min(a))/np.ptp(a)

            diff_1=body-ptn_body
            diff_2=range_-ptn_range
            diff_1=np.where((diff_1<pct)&(diff_1>-pct),1,diff_1)
            diff_1=np.where(diff_1!=1,0,diff_1)
            diff_2=np.where((diff_2<pct)&(diff_2>-pct),1,diff_2)
            diff_2=np.where(diff_2!=1,0,diff_2)
            if (sum(diff_1)>=pct_total)&(sum(diff_2)>=pct_total):
                data["signal"][e+lookback]=1

        return data["signal"]


    if direzione == "short":

        pct=round(1-float((1/100)*pct_accettata),1)
        pct_total=int((lookback/100)*pct_accettata)

        ptn_body=down_ls[n_patterny][0]
        ptn_range=down_ls[n_patterny][1]

        data=dataset.copy()
        data["signal"]=0

        for e in tqdm(range(len(data)-lookback)):

            a=data.close[e:e+lookback].values-data.open[e:e+lookback].values
            body=(a - np.min(a))/np.ptp(a)  

            a=data.high[e:e+lookback].values-data.low[e:e+lookback].values
            range_=(a - np.min(a))/np.ptp(a)

            diff_1=body-ptn_body
            diff_2=range_-ptn_range
            diff_1=np.where((diff_1<pct)&(diff_1>-pct),1,diff_1)
            diff_1=np.where(diff_1!=1,0,diff_1)
            diff_2=np.where((diff_2<pct)&(diff_2>-pct),1,diff_2)
            diff_2=np.where(diff_2!=1,0,diff_2)
            if (sum(diff_1)>=pct_total)&(sum(diff_2)>=pct_total):
                data["signal"][e+lookback]=1

        return data["signal"]
    
def generate_signal2(dataset,lookback,pct_accettata,fraction_movement,n_patterny,direzione):
    
    if direzione == "long":

        pct=round(1-float((1/100)*pct_accettata),1)
        pct_total=int((lookback/100)*pct_accettata)

        ptn_body=up_ls[n_patterny][0]
        ptn_range=up_ls[n_patterny][1]

        data=dataset.copy()
        data["signal"]=0
        
        
        dclose=data.close.values
        dopen=data.open.values
        dhigh=data.high.values
        dlow=data.low.values

        for e in (range(len(data)-lookback)):

            a=dclose[e:e+lookback]-dopen[e:e+lookback]
            body=(a - np.min(a))/np.ptp(a)  

            a=dhigh[e:e+lookback]-dlow[e:e+lookback]
            range_=(a - np.min(a))/np.ptp(a)

            diff_1=body-ptn_body
            diff_2=range_-ptn_range
            diff_1=np.where((diff_1<pct)&(diff_1>-pct),1,diff_1)
            diff_1=np.where(diff_1!=1,0,diff_1)
            diff_2=np.where((diff_2<pct)&(diff_2>-pct),1,diff_2)
            diff_2=np.where(diff_2!=1,0,diff_2)
            if (sum(diff_1)>=pct_total)&(sum(diff_2)>=pct_total):
                data["signal"][e+lookback]=1

        return data["signal"]


    if direzione == "short":

        pct=round(1-float((1/100)*pct_accettata),1)
        pct_total=int((lookback/100)*pct_accettata)

        ptn_body=down_ls[n_patterny][0]
        ptn_range=down_ls[n_patterny][1]

        data=dataset.copy()
        data["signal"]=0
        
        dclose=data.close.values
        dopen=data.open.values
        dhigh=data.high.values
        dlow=data.low.values

        for e in (range(len(data)-lookback)):

            a=dclose[e:e+lookback]-dopen[e:e+lookback]
            body=(a - np.min(a))/np.ptp(a)  

            a=dhigh[e:e+lookback]-dlow[e:e+lookback]
            range_=(a - np.min(a))/np.ptp(a)

            diff_1=body-ptn_body
            diff_2=range_-ptn_range
            diff_1=np.where((diff_1<pct)&(diff_1>-pct),1,diff_1)
            diff_1=np.where(diff_1!=1,0,diff_1)
            diff_2=np.where((diff_2<pct)&(diff_2>-pct),1,diff_2)
            diff_2=np.where(diff_2!=1,0,diff_2)
            if (sum(diff_1)>=pct_total)&(sum(diff_2)>=pct_total):
                data["signal"][e+lookback]=1

        return data["signal"]    

def plot_pattern(n_pattern,exit_after_bar,direction):

    if direction== "long":
        name="Ext_"+str(exit_after_bar)+"_bar"

        split_day=int(name.split("_")[1])
        stampa_LONG=big_res_long[n_pattern][name]
        idx=dataset.loc[dataset['idx'].isin(big_data_long[n_pattern])].index
        stampa_LONG.index=idx
        stampa_LONG.index=pd.to_datetime(stampa_LONG.index)
        stampa_LONG.index=stampa_LONG.index + timedelta(days=split_day)
        stampa_LONG.cumsum().plot(figsize=(20,10),title="Equity Pattern N:"+str(n_pattern)+" Exit After "+str(n_days)+" Bar")
        plt.show()

        v1=position_up[n_pattern][0]
        v2=position_up[n_pattern][1]
        df=dataset.iloc[v1-lookback:v1] 

        mpf.plot(df, type='candle', style='yahoo', volume=False, title='Pattern_N:'+str(n_pattern)+" "+direction ,axisoff=True)
        
        
    if direction== "short":
        name="Ext_"+str(exit_after_bar)+"_bar"

        split_day=int(name.split("_")[1])
        stampa_SHORT=big_res_short[n_pattern][name]

        idx=dataset.loc[dataset['idx'].isin(big_data_short[n_pattern])].index
        stampa_SHORT.index=idx
        stampa_SHORT.index=pd.to_datetime(stampa_SHORT.index)
        stampa_SHORT.index=stampa_SHORT.index + timedelta(days=split_day)
        stampa_SHORT.cumsum().plot(figsize=(20,10),title="Equity Pattern N:"+str(n_pattern)+" Exit After "+str(n_days)+" Bar")

        v1=position_down[n_pattern][0]
        v2=position_down[n_pattern][1]
        df=dataset.iloc[v1-lookback:v1] 

        mpf.plot(df, type='candle', style='yahoo', volume=False, title='Pattern_N:'+str(n_pattern)+" "+direction , axisoff=True)
        
def backtest(pct_accepted,bigpointvalue):

    pct=round(1-float((1/100)*pct_accepted),1)
    pct_total=int((lookback/100)*pct_accepted)

    big_res_long=[]
    big_data_long=[]
    big_res_short=[]
    big_data_short=[]
    
    print("BackTest Patterns Long")
    for ptn_num in tqdm(range(len(up_ls))):
        ptn_body=up_ls[ptn_num][0]
        ptn_range=up_ls[ptn_num][1]

        j=engine(pct,pct_total,ptn_body,ptn_range,np_open, np_high, np_low, np_close,"UP",lookback,n_days)
        res=pd.DataFrame(j[0])
        names_columns=[]
        for el in range(1,len(res.columns)+1):
            x="Ext_"+str(el)+"_bar"
            names_columns.append(x)
        res.columns=names_columns
        res=res*bigpointvalue
        big_res_long.append(res)
        big_data_long.append(j[1])
        
    print("BackTest Patterns Short")
    for ptn_num in tqdm(range(len(down_ls))):
        ptn_body=down_ls[ptn_num][0]
        ptn_range=down_ls[ptn_num][1]

        j=engine(pct,pct_total,ptn_body,ptn_range,np_open, np_high, np_low, np_close,"DOWN",lookback,n_days)
        res=pd.DataFrame(j[0])
        names_columns=[]
        for el in range(1,len(res.columns)+1):
            x="Ext_"+str(el)+"_bar"
            names_columns.append(x)
        res.columns=names_columns
        res=res*bigpointvalue
        big_res_short.append(res)
        big_data_short.append(j[1])
        
        
    return big_res_long , big_data_long , big_res_short , big_data_short

def Results_Patterns(big_res_long_short,ranking,direction):
    
    if direction == "long":
        print("Risultati Patterns Long")

        df_res_long=pd.DataFrame()
        for e in tqdm(range(len(big_res_long))):
            x=big_res_long[e]
            for k in range(len(x.columns)):
                y=x.iloc[:,k].fillna(0)
                y=y[y!=0]
                v=performance_report_gen(y.iloc[:-1])
                v["Exit"]=x.iloc[:,k].name
                v["Ptn_N°"]=e
                df_res_long=pd.concat([df_res_long,v])

        df_res_long=df_res_long.sort_values(by=ranking,ascending=False)
        return df_res_long

    if direction == "short":
        print("Risultati Patterns Short")
    
        df_res_short=pd.DataFrame()
        for e in tqdm(range(len(big_res_short))):
            x=big_res_short[e]
            for k in range(len(x.columns)):
                y=x.iloc[:,k].fillna(0)
                y=y[y!=0]
                v=performance_report_gen(y.iloc[:-1])
                v["Exit"]=x.iloc[:,k].name
                v["Ptn_N°"]=e
                df_res_short=pd.concat([df_res_short,v])
        df_res_short=df_res_short.sort_values(by=ranking,ascending=False)
        return df_res_short
    
def pandas_df_to_numpy_array(dataset):
    dataset["idx"]=list(range(len(dataset)))
    np_open = dataset.open.values
    np_high = dataset.high.values
    np_low = dataset.low.values
    np_close = dataset.close.values
    
    return np_open,np_high,np_low,np_close

In [None]:
def test_time_frame(tf):
    tf=tf.lower()
    ls_tf=["1min","5min","15min","30min","60min"]
    test_ok=0
    for e in ls_tf:
        if e == tf:
            tf_ok=e
            test_ok=1
            break
    if test_ok == 0:
            print("ERRORE - Solo time frame a: ",ls_tf," in ingresso")
    else:
        tf_ok=tf_ok.split("min")[0]
        return int(tf_ok)
    
def resample_standard_session(original_tf,resample_tf,dataset):
    
    original_tf=test_time_frame(original_tf)
    resample_tf=test_time_frame(resample_tf)
    df_0=dataset.copy()
    
    df_0.index=df_0.index - pd.DateOffset(minutes=original_tf)
    df_resample=df_0.resample(str(resample_tf)+'Min').agg({'open' : 'first', 'high' : 'max', 'low' : 'min', 'close' : 'last','volume':'sum'})
    df_resample.index=df_resample.index + pd.DateOffset(minutes=resample_tf)
    df_resample=df_resample.dropna()
    return df_resample

def load_data_intraday_fast(filename):
    data=pd.read_csv(filename,engine="pyarrow")
    data.set_index(["date_time"],inplace=True)
    data.index=pd.to_datetime(data.index)
    return data

def resample_custom_session(original_tf,resample_tf,dataset,start_time,end_time):
    
    original_tf=test_time_frame(original_tf)
    resample_tf=test_time_frame(resample_tf)
    df_0=dataset.copy()
    
    df_0.index=df_0.index - pd.DateOffset(minutes=original_tf)
    
    test_start_time=0
    if len(start_time) == 4:
        st_h=start_time[:2]
        st_m=start_time[2:]
        test_start_time=1
        
    test_end_time=0
    if len(end_time) == 4:
        en_h=end_time[:2]
        en_m=end_time[2:]
        test_end_time=1

    if (test_start_time==0)|(test_end_time==0):
        print("ERRORE - controlla gli orari di start & end")
    else:

        dati=df_0.loc[(df_0.index.time>=datetime.time(int(st_h), int(st_m)))&(df_0.index.time<=datetime.time(int(en_h), int(en_m)))]
        dati= dati.resample(str(resample_tf)+'Min',origin=st_h+":"+st_m+':00').agg({'open': 'first','high': 'max','low':'min','close':'last','volume':'sum'}).dropna()
        dati.index=dati.index + pd.DateOffset(minutes=resample_tf)
        
        test_h=sorted(list(set(dati.index.time)))
        
        time1 = datetime.time(int(en_h),int(en_m))
        timedelta = datetime.timedelta(minutes=original_tf)
        tmp_datetime = datetime.datetime.combine(datetime.date(1, 1, 1), time1)
        time2 = (tmp_datetime - timedelta).time()
        
        dati1=df_0.loc[(df_0.index.time>=test_h[-2])&(df_0.index.time<=time2)]
        dati1= dati1.resample("D").agg({'open': 'first','high': 'max','low':'min','close':'last','volume':'sum'}).dropna()
        dati1.index=dati1.index + pd.DateOffset(hours=int(en_h),minutes=int(en_m))

        custom_df=dati.loc[dati.index.time!=test_h[-1]].dropna()
        custom_df=pd.concat([custom_df,dati1])
        custom_df=custom_df.sort_index(ascending=True)
        return custom_df
    


In [None]:







    
    
def carica_storico(file_name,uct_offset,type_session,resample_tf,IS,OOS,custom_session_start,custom_session_stop,noise,pct_noise):
    import os
    import talib as ta
    
    os.chdir(dir_history)
    data = load_data_intraday_fast(file_name)  
    print("Caricato storico")
    data = data.sort_index(ascending=True)
    data.index = data.index.tz_localize('Etc/Zulu')
    data.index = data.index.tz_convert(uct_offset )
    data.index = data.index.tz_localize(None)
    data = data[data!=0]
    data = data.iloc[1:-1]
    print("Resample dei dati")
    if type_session == 1:
        dataset=resample_custom_session("5min",resample_tf ,data,custom_session_start,custom_session_stop) 
    if type_session == 0:    
        dataset=resample_standard_session("5min",resample_tf,data)
        
    if noise == 1:
        print("AGGIUNGO RUMORE")
        dataset=add_noise(dataset.open, dataset.high, dataset.low, dataset.close,dataset.volume, pct_noise)

    history=dataset.copy()
       
    return history 

def check_history_name(file_name,dir_history):
    import os
    storici = os.listdir(dir_history)
    for e in storici:
        x=e.split("_")[0]
        x=x.replace("@","")
        if x == file_name:
            file_name=e
            break
    return file_name

def performance_report_gen(tradelist):
    res=pd.DataFrame()
    d = {"Profit": sum(tradelist),
         "MaxDD" : max_draw_down(tradelist),
         "Operations":operation_number(tradelist),
         "AverageTrade": avg_trade(tradelist),
         "ProfitFactor": profit_factor(tradelist),
         "PctWin": percent_win(tradelist),
         "Sharpe": old_sharpe_ratio(tradelist.values),
         "Kestner_Ratio": kestner_ratio_daily(tradelist)}
    res=pd.DataFrame(data=d,index=[0])
    return res


def kestner_ratio_daily(operations):

    import numpy as np
    import matplotlib.pyplot as plt 
    from scipy import stats
    
    if len(operations)>3:
        daily_operations = operations
        daily_equity = daily_operations.cumsum()
        index = np.array(np.arange(1,daily_operations.count() + 1))

        x = index
        y = daily_equity
        gradient, intercept, r_value, p_value, std_err = stats.linregress(x,y)

        if std_err != 0 and len(index) > 0:
            return round(gradient / (std_err * len(index)),2)
        else:
            return np.inf
    else:
        return np.inf
    
def generate_signal_live(dataset,lookback,pct_accettata,fraction_movement,n_patterny,direzione):
    
    if direzione == "long":

        pct=round(1-float((1/100)*pct_accettata),1)
        pct_total=int((lookback/100)*pct_accettata)

        ptn_body=n_patterny[0]
        ptn_range=n_patterny[1]

        data=dataset.copy()
        data["signal"]=False
        
        
        dclose=data.close.values
        dopen=data.open.values
        dhigh=data.high.values
        dlow=data.low.values

        for e in (range(len(data)-lookback)):

            a=dclose[e:e+lookback]-dopen[e:e+lookback]
            body=(a - np.min(a))/np.ptp(a)  

            a=dhigh[e:e+lookback]-dlow[e:e+lookback]
            range_=(a - np.min(a))/np.ptp(a)

            diff_1=body-ptn_body
            diff_2=range_-ptn_range
            diff_1=np.where((diff_1<pct)&(diff_1>-pct),1,diff_1)
            diff_1=np.where(diff_1!=1,0,diff_1)
            diff_2=np.where((diff_2<pct)&(diff_2>-pct),1,diff_2)
            diff_2=np.where(diff_2!=1,0,diff_2)
            if (sum(diff_1)>=pct_total)&(sum(diff_2)>=pct_total):
                data["signal"][e+lookback]=True

        return data["signal"]


    if direzione == "short":

        pct=round(1-float((1/100)*pct_accettata),1)
        pct_total=int((lookback/100)*pct_accettata)

        ptn_body=n_patterny[0]
        ptn_range=n_patterny[1]

        data=dataset.copy()
        data["signal"]=False
        
        dclose=data.close.values
        dopen=data.open.values
        dhigh=data.high.values
        dlow=data.low.values

        for e in (range(len(data)-lookback)):

            a=dclose[e:e+lookback]-dopen[e:e+lookback]
            body=(a - np.min(a))/np.ptp(a)  

            a=dhigh[e:e+lookback]-dlow[e:e+lookback]
            range_=(a - np.min(a))/np.ptp(a)

            diff_1=body-ptn_body
            diff_2=range_-ptn_range
            diff_1=np.where((diff_1<pct)&(diff_1>-pct),1,diff_1)
            diff_1=np.where(diff_1!=1,0,diff_1)
            diff_2=np.where((diff_2<pct)&(diff_2>-pct),1,diff_2)
            diff_2=np.where(diff_2!=1,0,diff_2)
            if (sum(diff_1)>=pct_total)&(sum(diff_2)>=pct_total):
                data["signal"][e+lookback]=True

        return data["signal"]    
    
    

def old_sharpe_ratio(operations):
    """
    Il rapporto tra il guadagno totale 
    e la deviazione standard dell'equity line
    """
    if len(operations)>3:
        equity = operations.cumsum()
        netprofit = equity[-1]
        std = equity.std()
        if std != 0:
            return round(netprofit / std,2)
        else:
            return np.inf
    else:
        return np.inf