#Setup

In [None]:
import numpy as np
from pandas_datareader import data as pdr
import yfinance as yf
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import pandas as pd
import math

#Raw Data Collection and processing

In [None]:
yf.pdr_override()
df = pdr.get_data_yahoo("AAPL", start="2015-01-01", end="2023-05-07")
# df = pdr.get_data_yahoo("EURUSD=X", start="2023-07-14", end="2023-08-15",interval = "15m")

[*********************100%%**********************]  1 of 1 completed


# Targets Extraction

In [None]:
# MAJOR-REVERSAL
def get_maxs(points):
   maxs_index = []
   maxs_value = []
   if len(points) > 2:
     for i in range(1,len(points)-1):
        if points[i] > points[i-1] and points[i] > points[i+1]:
          maxs_index.append(i)
          maxs_value.append(points[i])
   return maxs_index,maxs_value

def get_mins(points,maxs):
    mins = []
    ranges = maxs
    for i in range(len(maxs)-1):
      idxmin_loc = points.iloc[ranges[i]+1:ranges[i+1]].idxmin()
      idx_loc = points.index.get_loc(idxmin_loc)
      mins.append(idx_loc)
    return mins

def get_major_reversals(high_points,low_points):
    maxs_index,maxs_value = get_maxs(high_points)
    max_maxs_index,maxs_value = get_maxs(maxs_value)
    maxs_index = [maxs_index[i] for i in max_maxs_index]
    mins_index = get_mins(low_points,maxs_index)
    return high_points.index[mins_index],high_points.index[maxs_index]

# MINOR-REVERSAL
def before_up_mask(points,avg_candle,factor_1 = 0.9, factor_2 = 0.1):
   return ~( ((points) >= (points.shift(1) + (avg_candle * factor_1)))
          | ( (points.shift(1) < (points - avg_candle * factor_2)) &  (points < (points.shift(-1) - avg_candle * factor_2)) ) )
def next_up_mask(points,avg_candle,factor_1 = 0.9, factor_2 = 0.1):
   return   ( ((points.shift(-1)) >= (points + (avg_candle * factor_1)))
              | ( (points < (points.shift(-1) - avg_candle * factor_2)) &  (points.shift(-1) < (points.shift(-2) - avg_candle * factor_2)) ) )
def is_up(points,avg_candle,factor_1 = 0.9, factor_2 = 0.1):
   return next_up_mask(points,avg_candle) & before_up_mask(points,avg_candle)
def before_down_mask(points,avg_candle,factor_1 = 0.9, factor_2 = 0.1):
   return ~( ((points) <= (points.shift(1) - (avg_candle * factor_1)))
          | ( (points.shift(1) > (points + avg_candle * factor_2)) &  (points > (points.shift(-1) + avg_candle * factor_2)) ) )
def next_down_mask(points,avg_candle,factor_1 = 0.9, factor_2 = 0.1):
   return   ( ((points.shift(-1)) <= (points - (avg_candle * factor_1)))
              | ( (points > (points.shift(-1) + avg_candle * factor_2)) &  (points.shift(-1) > (points.shift(-2) + avg_candle * factor_2)) ) )
def is_down(points,avg_candle):
   return next_down_mask(points,avg_candle) & before_down_mask(points,avg_candle)
def is_steady(high_points,low_points,avg_candle):
  return ~next_up_mask(low_points,avg_candle) & ~next_down_mask(high_points,avg_candle)

def get_minor_reversals(high_points,low_points):
    avg_candle = (high_points - low_points).rolling(window=14).mean()
    down_reversal = high_points[is_down(high_points,avg_candle)].index
    up_reversal = low_points[is_up(low_points,avg_candle)].index
    mask = is_steady(high_points,low_points,avg_candle)
    steady = low_points[mask & ~mask.shift(1).fillna(True)].index
    return up_reversal, down_reversal, steady

# TREND-SHIFT
def find_outliers(chunk, q1 = 0.4, q2 = 0.6):
    Q1 = chunk.quantile(q1)
    Q3 = chunk.quantile(q2)
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    return chunk[(chunk < lower_bound) | (chunk > upper_bound)]
def get_trend_shift(high_points,low_points):
     avg_hl = (low_points + high_points) / 2
     trend_angle_diff = avg_hl.diff(periods = -1).apply(lambda x: math.degrees(math.atan(x))).diff()
     chunk_size = 200
     trend_angle_diff = trend_angle_diff.reset_index(drop = True)
     group_key = trend_angle_diff.index // chunk_size
     trend_shift = trend_angle_diff.groupby(group_key).apply(find_outliers).reset_index(level=0, drop=True)
     return high_points.index[trend_shift.index]

# Major_Trend (categorical): -1,0,1
def get_major_trend(mins_index,maxs_index,original_idx_list,len_df):
    idx_list = np.insert(original_idx_list, 0, -1, axis=0)
    idx_list = np.append(idx_list, len_df)
    trend_array = np.zeros(len_df)
    trend = -1 if min(mins_index) < min(maxs_index) else 1
    for i in range(len(idx_list)-1):
        trend_array[idx_list[i]+1:idx_list[i+1]] = trend
        trend *= -1
    return trend_array

# Number of candles before next major reversal (discrete): 0 to inf
# Change before next major reversal (continuous): 0 to inf
def get_Metrics_Before_Next_Major_Reversal(points,original_idx_list,len_df):
    change_array = np.array([], dtype=int)
    nCandles_array = np.array([], dtype=int)
    prev_reversal = 0
    for reversal in original_idx_list:
        num_points = reversal - prev_reversal
        nCandles_array = np.concatenate((nCandles_array, np.arange(num_points, 0, -1)))
        partition = points.iloc[prev_reversal:reversal]
        change_array = np.concatenate((change_array, ((partition * -1) + points.iloc[reversal]) / partition))
        prev_reversal = reversal
    nCandles_array = np.concatenate((nCandles_array, np.full(len_df - reversal, np.nan)))
    change_array = np.concatenate((change_array, np.full(len_df - reversal, np.nan)))
    return nCandles_array, change_array

# Visualize and inspect cause it is not really good
def get_fractals(high , low, period) :
    periods = [p for p in range(-period, period + 1) if p != 0]

    highs = [high > high.shift(p) for p in periods]
    bears = pd.Series(np.logical_and.reduce(highs), index=high.index)
    bears_index = bears[bears == True].index

    lows = [low < low.shift(p) for p in periods]
    bulls = pd.Series(np.logical_and.reduce(lows), index=high.index)
    bulls_index = bulls[bulls == True].index

    return bears_index, bulls_index

In [None]:
def add_labels(df):
    # Major_Reversal_Type_HL: 0,-1,1
    mins_index_hl,maxs_index_hl = get_major_reversals(df.High,df.Low)
    df["Major_Reversal_Type_HL"] = 0
    df["Major_Reversal_Type_HL"][mins_index_hl] = -1
    df["Major_Reversal_Type_HL"][maxs_index_hl] = 1
    # Major_Reversal_Type_Open: 0,-1,1
    mins_index_open,maxs_index_open = get_major_reversals(df.Open,df.Open)
    df["Major_Reversal_Type_Open"] = 0
    df["Major_Reversal_Type_Open"][mins_index_open] = -1
    df["Major_Reversal_Type_Open"][maxs_index_open] = 1
    # Next_Major_Reversal_Type: 0,-1,1
    df["Next_Major_Reversal_Type_Open"] = df["Major_Reversal_Type_Open"].shift(-1)
    # Previous_Major_Reversal_Type: 0,-1,1
    df["Previous_Major_Reversal_Type_Open"] = df["Major_Reversal_Type_Open"].shift(1)
    # is_Fractal_HL_2: -1,0,1
    max_fractals_HL_2_index, min_fractals_HL_2_index = get_fractals(df.High,df.Low,2)
    df["is_Fractal_HL_2"] = 0
    df["is_Fractal_HL_2"][min_fractals_HL_2_index] = -1
    df["is_Fractal_HL_2"][max_fractals_HL_2_index] = 1
    # is_Fractal_Open_3: -1,0,1
    max_fractals_O_3_index, min_fractals_O_3_index = get_fractals(df.Open,df.Open,3)
    df["is_Fractal_Open_3"] = 0
    df["is_Fractal_Open_3"][min_fractals_O_3_index] = -1
    df["is_Fractal_Open_3"][max_fractals_O_3_index] = 1
    # Major_Trend (categorical): -1,0,1
    original_idx_list = np.sort(np.concatenate([np.where(df.index.isin(mins_index_open))[0], np.where(df.index.isin(maxs_index_open))[0]]))
    df['Major_Trend'] = get_major_trend(mins_index_open,maxs_index_open,original_idx_list,len(df))
    # Next_Major_Trend (categorical):-1,0,1
    df["Next_Major_Trend"] = df["Major_Trend"].shift(-1, axis = 0)
    # Previous_Major_Trend (categorical): -1,0,1
    df["Previous_Major_Trend"] = df["Major_Trend"].shift(1, axis = 0)
    # Number of candles before next major reversal (discrete): 0 to inf
    # Change before next major reversal (continuous): 0 to inf
    nCandles_array, change_array = get_Metrics_Before_Next_Major_Reversal(df.Open,original_idx_list,len(df))
    df["nCandles_Before_Major_Reversal"] = nCandles_array
    df["Change_Before_Major_Reversal"] = change_array
    # Minor_Reversal_Type
    up_reversal, down_reversal, steady = get_minor_reversals(df.High,df.Low)
    df["Minor_Reversal_Type"] = 0
    df["Minor_Reversal_Type"][up_reversal] = -1
    df["Minor_Reversal_Type"][down_reversal] = 1
    df["Minor_Reversal_Type"][steady] = 2
    # Trend_Shift
    trend_shift = get_trend_shift(df.High,df.High)
    df["is_Trend_Shift"] = 0
    df["is_Trend_Shift"][trend_shift] = 1

In [None]:
add_labels(df)

# Target Variable manual inspection

In [None]:
# TAREGT VARIABLES MANUAL INSPECTION

temp_df = df.copy()
temp_df = temp_df.iloc[600:800]

add_labels(temp_df)

fig = go.Figure(data=[go.Candlestick(x=temp_df.index,
                open=temp_df['Open'],
                high=temp_df['High'],
                low=temp_df['Low'],
                close=temp_df['Close'])])
fig.update_layout(xaxis_title='Date')

for i in temp_df[temp_df.is_Fractal_Open_3 == 1].index:
          fig.add_annotation(x=i, y=temp_df.Open[i], ax=0, ay=-30,
                            showarrow=True, arrowhead=1, arrowcolor="orange")

for i in temp_df[temp_df.is_Fractal_Open_3 == -1].index:
          fig.add_annotation(x=i, y=temp_df.Open[i], ax=0, ay=30,
                            showarrow=True, arrowhead=1, arrowcolor="purple ")

fig.show()



A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/i

# Feature Extraction

In [None]:
#### FEATURE EXTRACTION
def is_increase(points):
   return (points > points.shift(1)).astype(int)

def change(points):
   return (points - points.shift(1))

def percentage_change(points):
   return (points - points.shift(1)) / points.shift(1) * 100

def trend_age(df,mins,maxs):
  idx_list = mins + maxs
  idx_list.sort()
  ret = []
  minus = 0
  for i in range(len(df)):
    if i-1 in idx_list:
      minus = i
    ret.append(i-minus+1)
  df["trend_age"] = ret


# Support Resistance
def isFarFromLevel(l,s,levels):
   return np.sum([abs(l-x) < s  for x in levels.Value.values]) == 0

def get_levels(max_maxs, min_mins, high_points,low_points,df):
  lvls = pd.DataFrame(columns = ['Start', 'Value'])
  s =  np.mean(df['High'] - df['Low'])
  indexes = max_maxs+min_mins
  indexes.sort()
  for i in indexes:
    if i in max_maxs:
      l = high_points[i]
      if isFarFromLevel(l,s,lvls):
          lvls.loc[len(lvls)] = [high_points.index[i], l]
    if i in min_mins:
      l = low_points[i]
      if isFarFromLevel(l,s,lvls):
          lvls.loc[len(lvls)] = [high_points.index[i], l]
  lvls = lvls.sort_values("Start").reset_index(drop = True)
  return lvls

lvls_features = ["age",'last_MReversal','nCandle','nCandleTime','nReversal','nMReversal','nReversalClose',"nReversalOpen","nReversalHigh","nReversalLow","nMReversalClose","nMReversalOpen","nMReversalHigh","nMReversalLow"]
def levels_propreties(df,lvls,analysis,current,reversal_dict):
    cut_lvls = lvls[lvls["Start"] < current]
    current_candle = df[df.index == current]
    for i in range(cut_lvls.shape[0]):
      l = cut_lvls.Value[i]
      s = cut_lvls.Start[i]
      analysis_index = analysis[analysis.Lvl == l].index
      # age
      age = (current - s).days
      analysis.loc[analysis_index,'age'] = age
      # last Mreversal age update
      analysis.loc[analysis_index,'last_MReversal'] += 1
      # nCandles
      zone = df.avg_candle[current] / 3
      touching = ((current_candle.Low - l < 0) & (current_candle.High - l > 0)) | ((current_candle.Low - l < zone) & (current_candle.Low - l > 0)) | ((l - current_candle.High < zone) & (current_candle.High - l < 0))
      touching = touching[touching.index[0]]
      if touching:
         analysis.loc[analysis_index,'nCandle'] += 1
         # last_MReversal
         if df[df.index == current].Reversal.iloc[0] == 1:
            analysis.loc[analysis_index,'last_MReversal'] = 0
         # nCandlesTime
         nCandle = analysis.loc[analysis_index,'nCandle']
         analysis.loc[analysis_index,'nCandleTime'] = nCandle / age
         # nReversal and nMreversal
         added = False
         Madded = False
         for ohlc_key in reversal_dict:
           for reversal_key in reversal_dict[ohlc_key]:
             if current in reversal_dict[ohlc_key][reversal_key]:
                analysis.loc[analysis_index,f"n{reversal_key}{ohlc_key}"] += 1
                if reversal_key == "Reversal":
                  if not added:
                    analysis.loc[analysis_index,"nReversal"] += 1
                    added = True
                else:
                  if not Madded:
                    analysis.loc[analysis_index,"nMReversal"] += 1
                    Madded = True
    return analysis

def add_SR(df):
   df["High-below"] = 0
   df["Low-below"] = 0
   df["Close-below"] = 0
   df["Open-below"] = 0
   df["Min-below"] = 0
   df["Max-below"] = 0
   df["above-High"] = 0
   df["above-Low"] = 0
   df["above-Close"] = 0
   df["above-Open"] = 0
   df["above-Min"] = 0
   df["above-Max"] = 0
   df['through-High'] = 0
   df['through-Low'] = 0
   df['through-Close'] = 0
   df['through-Open'] = 0
   df["through-Min"] = 0
   df["through-Max"] = 0

   df["no_below"] = 0
   df["no_above"] = 0
   df["no_through"] = 0

   for f in lvls_features:
     df[f"below_{f}"] = 0
     df[f"above_{f}"] = 0
     df[f"through_{f}"] = 0

  #  # number of lvls broken
  #  df["nBreakout"] = 0
  #  # strong level breakout
  #  df["nMBreakout"] = 0

   # get lvls
   high_points = df.High
   low_points = df.Low
   mins, maxs,r,r = get_reversal(high_points)
   max_points = high_points.loc[high_points.index[maxs]]
   max_mins, max_maxs,r,r = get_reversal(max_points)
   max_maxs = [maxs[i] for i in max_maxs]
   min_mins = get_mins(low_points,max_maxs)
   lvls = get_levels(max_maxs,min_mins,high_points,low_points,df)
   # get reversals and Mreversals
   reversal_dict = {"Close": {"Reversal":[],"MReversal":[]},"Open": {"Reversal":[],"MReversal":[]}
              ,"High": {"Reversal":[],"MReversal":[]},"Low": {"Reversal":[],"MReversal":[]}}
   OHLC = [df.Close,df.Open,df.High,df.Low]
   for points,key in zip(OHLC,reversal_dict):
    mins, maxs,r,r = get_reversal(points)
    reversal_dict[key]["Reversal"] = df.index[maxs+mins]
    max_mins, max_maxs,r,r = get_reversal(points.loc[points.index[maxs]])
    max_maxs = [maxs[i] for i in max_maxs]
    min_mins = get_mins(points,max_maxs)
    reversal_dict[key]["MReversal"] = df.index[max_maxs+min_mins]
   # initialize the propreties dataframe
   prop = pd.DataFrame(0, index=np.arange(len(lvls)),columns = ['Lvl','Start']+lvls_features)
   prop["Lvl"] = lvls["Value"]
   prop["Start"] = lvls["Start"]

   for i in range(df.shape[0]):
    current = df.index[i]
    prop = levels_propreties(df, lvls, prop, current,reversal_dict)
    cut_prop = prop[prop["Start"] < current]
    resistances = cut_prop.Lvl[cut_prop.Lvl - df.High[current] >= 0]
    resistance = None
    if len(resistances) > 0:
        resistance = cut_prop.iloc[resistances.idxmin()]
    supports = cut_prop.Lvl[cut_prop.Lvl - df.Low[current] <= 0]
    support = None
    if len(supports) > 0:
        support = cut_prop.iloc[supports.idxmax()]
    throughs = cut_prop.Lvl[(cut_prop.Lvl - df.Low[current] > 0) & (cut_prop.Lvl - df.High[current] < 0)]
    through = None
    if len(throughs) > 0:
        through = cut_prop.iloc[throughs.idxmax()] # i took the max just randomly
    Min = min(df.Open[current],df.Close[current])
    Max = max(df.Open[current],df.Close[current])
    if type(support) != type(None):
        df.loc[current, 'High-below'] = df.High[current] - support.Lvl
        df.loc[current, 'Low-below'] = df.Low[current] - support.Lvl
        df.loc[current, 'Close-below'] = df.Close[current] - support.Lvl
        df.loc[current, 'Open-below'] = df.Open[current] - support.Lvl
        df.loc[current, 'Min-below'] =  Min - support.Lvl
        df.loc[current, 'Max-below'] = Max - support.Lvl

        for val, column in zip(support[2:], support.index[2:]):
            df.loc[current, f'below_{column}'] = val
    else:
        df.loc[current, 'no_below'] = 1
    if type(resistance) != type(None):
        df.loc[current, 'above-High'] = resistance.Lvl - df.High[current]
        df.loc[current, 'above-Low'] = resistance.Lvl - df.Low[current]
        df.loc[current, 'above-Close'] = resistance.Lvl - df.Close[current]
        df.loc[current, 'above-Open'] = resistance.Lvl - df.Open[current]
        df.loc[current, 'above-Min'] = resistance.Lvl - Min
        df.loc[current, 'above-Max'] = resistance.Lvl - Max
        for val, column in zip(resistance[2:], resistance.index[2:]):
            df.loc[current, f'above_{column}'] = val
    else:
        df.loc[current, 'no_above'] = 1
    if type(through) != type(None):
        df.loc[current, 'through-High'] = through.Lvl - df.High[current]
        df.loc[current, 'through-Low'] = through.Lvl - df.Low[current]
        df.loc[current, 'through-Close'] = through.Lvl - df.Close[current]
        df.loc[current, 'through-Open'] = through.Lvl - df.Open[current]
        df.loc[current, 'through-Min'] = through.Lvl - Min
        df.loc[current, 'through-Max'] = through.Lvl - Max
        for val, column in zip(through[2:], through.index[2:]):
            df.loc[current, f'through_{column}'] = val
    else:
        df.loc[current, 'no_through'] = 1

   distances = ["High-below","Low-below","Close-below","Open-below","Min-below","Max-below",
                "above-High","above-Low","above-Close","above-Open","above-Min","above-Max",
                "through-High","through-Low","through-Close","through-Open",'through-Max','through-Min']
   for d in distances:
     df[f"Relative-{d}"] = df[d] / df["avg_candle"]


In [None]:
df = df.iloc[:10000]

In [None]:
# adding the reversal, rates, and trend
high_points = df.High
low_points = df.Low
mins, maxs,r,r = get_reversal(high_points)
max_points = high_points.loc[high_points.index[maxs]]
max_mins, max_maxs,r,r = get_reversal(max_points)
max_maxs = [maxs[i] for i in max_maxs]
min_mins = get_mins(low_points,max_maxs)

# mins, maxs,r,r = get_reversal(points)
# max_points = points.loc[points.index[maxs]]
# max_mins, max_maxs,r,r = get_reversal(max_points)
# max_maxs = [maxs[i] for i in max_maxs]
# mins = get_mins(points,max_maxs)
# maxs = max_maxs


# the reversal rate here is not accurate cause it should include low points
add_reversal(df,high_points,min_mins, max_maxs)
add_major_trend(df,min_mins,max_maxs)
add_minor_trend(df,mins,maxs)
add_trend_age(df,min_mins,max_maxs)

# adding avg candle height in the last 7 candles
df["avg_candle"] = df['High'].rolling('7D').mean() - df['Low'].rolling('7D').mean()

# SR
add_SR(df)

In [None]:
!pip install pandas_ta

In [None]:
import pandas_ta as ta

lens = [5,7,10,14,20,30,40,50,60,100,150,200,250,300]
for l in lens:
  df[f'sma_{str(l)}'] = ta.sma(df['Close'], length=l)
  df[f'ema_{str(l)}'] = ta.ema(df['Close'], length=l)
  df[f'RSI_{str(l)}']=ta.rsi(df.Close, length=l)

df['obv'] = ta.obv(df['Close'], df['Volume'])
df['atr'] = ta.atr(df['High'], df['Low'], df['Close'])
df["VWAP"]=ta.vwap(df.High, df.Low, df.Close, df.Volume)

my_bbands = ta.bbands(df.Close, length=14, std=2.0)
df=df.join(my_bbands)
df = df.dropna()

candle = ["Open","Low","Close","High"]
for l in lens:
  for c in candle:
    df[f'sma_{str(l)}-{c}'] = df[c] - df[f'sma_{str(l)}']
    df[f'ema_{str(l)}-{c}'] = df[c] - df[f'sma_{str(l)}']
for c in candle:
 df[f"BBL_14_2.0-{c}"] = df[c] - df["BBL_14_2.0"]
 df[f"BBM_14_2.0-{c}"] = df[c] - df["BBM_14_2.0"]
 df[f"BBU_14_2.0-{c}"] = df[c] - df["BBU_14_2.0"]



# Evaluation

In [None]:
def get_positions(signals,Open):
   signals = pd.Series(signals)
   signals = signals.reset_index(drop = True)
   Open.index = signals.index
   shifted = signals.replace(0, None).ffill().shift()
   buy_mask = (signals == -1) & (shifted != -1)
   sell_mask = (signals == 1) & (shifted != 1)
   position_prices = Open[(buy_mask | sell_mask)]
   position_type = 1 if buy_mask[buy_mask == True].index[0] < sell_mask[sell_mask == True].index[0] else -1
   alternating_values = np.tile([position_type, -position_type], len(position_prices) // 2 + 1)[:len(position_prices)]
   num_bars = pd.Series(position_prices.index).diff()
   position_prices.index = num_bars.index
   positions = pd.DataFrame({"price_1" : position_prices, "price_2" : position_prices.shift(-1), "buy_or_sell" : alternating_values, "bars": num_bars  }).dropna()
   return positions

def generate_random_signals(len_df):
    return pd.Series(np.random.choice([0, 1, -1], len_df))

def get_returns(positions):
    return (positions.price_2 - positions.price_1) / positions.price_1 * positions.buy_or_sell

def get_simple_return(returns):
    return returns.sum()

def get_compound_return(returns):
    return ((1 + returns).prod() - 1)

def get_buyhold_return(Open):
    return ((Open.iloc[-1] - Open.iloc[0]) / Open.iloc[0])

def get_streaks(returns):
  positive_streaks = (returns > 0).astype(int).groupby(returns <= 0).cumsum()
  longest_positive_streak = positive_streaks.max()
  negative_streaks = (returns < 0).astype(int).groupby(returns >= 0).cumsum()
  longest_negative_streak = negative_streaks.max()
  return longest_positive_streak, longest_negative_streak

def get_coefficient_variation(returns):
   std_dev = returns.std()
   mean_return = returns.mean()
   cv = std_dev / mean_return
   return cv

In [None]:
# ML-based evaluation
def evaluate(true_signals_train,true_signals_test, predicted_signals_train, predicted_signals_test , Open):
  print("ML-based evaluation \n")
  print("Train Data\n")
  target_names = ['Buy', 'Hold', 'Sell']
  print(classification_report(true_signals_train, predicted_signals_train, target_names=target_names))
  print("Test Data\n")
  print(classification_report(true_signals_test, predicted_signals_test, target_names=target_names))
  print("\n" + "-"*50 + "\n")
  print("Financial-based evaluation \n")
  print(f"Backtest Duration: {Open.index[-1] - Open.index[0]}\n")
  print("Profitability \n")
  predicted_positions = get_positions(predicted_signals_test,Open)
  predicted_returns  = get_returns(predicted_positions)
  print(f"Simple/Compound Return: {get_simple_return(predicted_returns)} / {get_compound_return(predicted_returns)}" )
  print(f"Buy and Hold Return: {get_buyhold_return(Open)}")
  random_signals = generate_random_signals(len(Open))
  random_positions = get_positions(random_signals,Open)
  random_returns  = get_returns(random_positions)
  print(f"Simple/Compound Random Signals Return: {get_simple_return(random_returns)} / {get_compound_return(random_returns)}" )
  true_positions = get_positions(true_signals_test,Open)
  true_returns  = get_returns(true_positions)
  print(f"Simple/Compound True Signals Return: {get_simple_return(true_returns)} / {get_compound_return(true_returns)}" )
  print("\nLiquidity and Frequency \n")
  print(f"Average number of bars per trade: {predicted_positions.bars.mean()}")
  print(f"Total number of trades: {len(predicted_positions)}")
  print(f"Total number of true signals: {len(true_signals_test)}")
  print("\nTrade Specific\n")
  print(f"Maximum trade duration: {predicted_positions.bars.max()}")
  print(f"Minimum trade duration: {predicted_positions.bars.min()}")
  print(f"Average return per trade: {predicted_returns.mean()}")
  print("\nConsistency\n")
  longest_positive_streak, longest_negative_streak = get_streaks(predicted_returns)
  print(f"Winning streak: {longest_positive_streak}")
  print(f"Losing streak: {longest_negative_streak}")
  print(f"Coefficient of Variation: {  get_coefficient_variation(predicted_returns)}")
  num_winning_trades = len(predicted_returns[predicted_returns > 0])
  win_rate = num_winning_trades / len(predicted_returns)
  print(f"Win Rate: {win_rate}")

In [None]:
def plot_signals(true_signals,predicted_signals,df):
  true_signals = pd.Series(true_signals)
  true_signals = true_signals.reset_index(drop = True)
  predicted_signals = pd.Series(predicted_signals)
  predicted_signals = predicted_signals.reset_index(drop = True)
  df = df.reset_index(drop = True)
  fig = go.Figure(data=[go.Candlestick(x=df.index,
                open=df['Open'],
                high=df['High'],
                low=df['Low'],
                close=df['Close'])])

  for i in true_signals[true_signals == 1].index:
          fig.add_annotation(x=i, y=df.Open[i], ax=0, ay=-30,
                            showarrow=True, arrowhead=1, arrowcolor="red")

  for i in true_signals[true_signals == -1].index:
            fig.add_annotation(x=i, y=df.Open[i], ax=0, ay=30,
                              showarrow=True, arrowhead=1, arrowcolor="green")

  for i in predicted_signals[predicted_signals == 1].index:
          fig.add_annotation(x=i, y=df.Open[i], ax=0, ay=-25,
                            showarrow=True, arrowhead=1, arrowcolor="orange")

  for i in predicted_signals[predicted_signals == -1].index:
            fig.add_annotation(x=i, y=df.Open[i], ax=0, ay=25,
                              showarrow=True, arrowhead=1, arrowcolor="blue ")

  fig.show()

# ML

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.metrics import accuracy_score, mean_squared_error, r2_score
import statsmodels.api as sm

df = df.dropna()
df_train,df_test = train_test_split(df, test_size=0.2, random_state=42)

features = df.columns[:6]
labels = df.columns[6:]
X_train = df_train[features]
X_test = df_test[features]

models = {}
for label in labels:
    y_train = df_train[label]

    if y_train.nunique() > 4:
        model = LinearRegression()
        model.fit(X_train, y_train)
    else:
      model = RandomForestClassifier(n_estimators=10000)
      model.fit(X_train, y_train)

    models[label] = model

In [None]:
meta_X_train = pd.DataFrame()
meta_X_test = pd.DataFrame()
meta_y_train = df_train["Major_Reversal_Type_Open"]
meta_y_test = df_test["Major_Reversal_Type_Open"]
for label in models:
   meta_X_train[label] = models[label].predict(X_train)
   meta_X_test[label] = models[label].predict(X_test)

meta_model = RandomForestClassifier(n_estimators=10000)
meta_model.fit(meta_X_train, meta_y_train)

In [None]:
evaluate(meta_y_train,meta_y_test, meta_model.predict(meta_X_train),meta_model.predict(meta_X_test), df_test.Open)