In [2]:
import numpy as np
import pandas as pd
from datetime import datetime, timedelta, timezone, date
from scipy.signal import find_peaks
import pytz
import time
from IPython.display import clear_output

In [3]:
#Functions of the sell diagram ratio module

def convert_minima_to_HH_HL_df(Pay, p1_range_localminimas):
    Pa = p1_range_localminimas[p1_range_localminimas['low'] == Pay].drop(['open','high','close'], axis=1)
    Pa = Pa.drop_duplicates(subset=['low'], keep='last')
    Pa = Pa.rename(columns={"low":"price"})
    return Pa

def convert_maxima_to_HH_HL_df(Pay, condition, p1_range_localmaximas):
    Pa = p1_range_localmaximas[(p1_range_localmaximas['high'] == Pay) & condition].drop(['open','low','close'], axis=1)  
    Pa = Pa.drop_duplicates(subset=['high'], keep='last')
    Pa = Pa.rename(columns={"high":"price"})
    return Pa
    
def compute_index_Pa_n(Pay, p1_range_localminimas):
    Pay_index = p1_range_localminimas[p1_range_localminimas['low'] == Pay].index[-1]
    return Pay_index

def create_tuples_p1_p3(df):
    p1_p2_p3 = []
    max_index = df.index.max()
    no_of_comb = ((max_index + 1) // 2) -1  # +1 to include the last element for odd lengths
    for i in range(no_of_comb):
        p1_p2_p3.append((max_index, max_index - 2*(i+1) + 1, max_index - 2*(i+1)))
    return p1_p2_p3 

### start "to delete"

In [4]:
#Fake Values only in here since this is backtest

#Importing the CSV file
path = f"./OHLC_data/GBPUSD_20250109_20250111.csv"

OHLC_df = pd.read_csv(path)
OHLC_df['time'] = pd.to_datetime(OHLC_df['time'], format='%Y-%m-%d %H:%M:%S')

In [5]:
# DATA TO BE TESTED
mask = (OHLC_df["time"] >= "2025-01-10 12:02:00") & (OHLC_df["time"] <= "2025-01-10 15:18:00")
Last_110 = OHLC_df[mask].tail(110)
print(Last_110)

                    time     open     high      low    close
2728 2025-01-10 13:29:00  1.22941  1.22948  1.22936  1.22945
2729 2025-01-10 13:30:00  1.22945  1.22980  1.22945  1.22966
2730 2025-01-10 13:31:00  1.22965  1.22974  1.22961  1.22974
2731 2025-01-10 13:32:00  1.22974  1.22977  1.22974  1.22975
2732 2025-01-10 13:33:00  1.22975  1.22986  1.22974  1.22986
...                  ...      ...      ...      ...      ...
2833 2025-01-10 15:14:00  1.23126  1.23143  1.23126  1.23129
2834 2025-01-10 15:15:00  1.23138  1.23138  1.23120  1.23128
2835 2025-01-10 15:16:00  1.23126  1.23145  1.23124  1.23144
2836 2025-01-10 15:17:00  1.23144  1.23155  1.23137  1.23144
2837 2025-01-10 15:18:00  1.23143  1.23157  1.23133  1.23154

[110 rows x 5 columns]


### end "to delete"

In [6]:
# FUNCTION TO DETECT P1 P2 P3 for the EXTERNAL AND INTERNAL
def obtain_p1_p2_p3(Last_110):
    #obtaining the maximum point from the last 110 minutes for sells
    Max_110_y = Last_110["high"].max()
    Max_110 = Last_110[Last_110["high"] == Max_110_y].iloc[[-1]]
    
    #obtaining the minimum point from the last 110 minutes for sells
    Min_110_y = Last_110["low"].min()
    Min_110 = Last_110[Last_110["low"] == Min_110_y].iloc[[-1]]
    
    # Creating a new dataframe for the start and end of the minimum points in order to grab the data of local maxima and minima in an uptrend
    p1_range_start_time = Min_110["time"].iloc[-1]
    p1_range_end_time = Max_110['time'].iloc[-1]
    mask = Last_110["time"].ge(p1_range_start_time) & Last_110["time"].le(p1_range_end_time)
    p1_range = Last_110[mask].reset_index(drop=True)
    
    # Find the index of local maximas
    local_maximas_i,_ = find_peaks(p1_range['high'].values)
    local_maximas_i = np.array(local_maximas_i)
    
    # Find the index of the local minima by inverting the data
    local_minimas_i,_ = find_peaks(-p1_range['low'].values)
    local_minimas_i = np.array(local_minimas_i)
    
    #Obtaining the dataframe of the local minimas and local maximas
    p1_range_localminimas = p1_range[p1_range.index.isin(local_minimas_i)].reset_index(drop=True)
    p1_range_localmaximas = p1_range[p1_range.index.isin(local_maximas_i)].reset_index(drop=True)
    
    # Finding the value of Pay1
    Pay1 = Min_110_y
    
    #to find the y value of the possible 2nd local minima:
    Pay3 = p1_range_localminimas["low"].min()
    Pay3_index = compute_index_Pa_n(Pay3, p1_range_localminimas)
    Pay3_index_range = Pay3_index
    
    while True:
        mask = p1_range["low"] == Pay3
        Pay3_candle = p1_range[mask].iloc[0]
        Pay3_high = Pay3_candle["high"]
        try:
            #finding the index of the 1st local maxima in the series "p1_range_localminimas"
            condition1 = p1_range_localmaximas["time"].ge(p1_range.iloc[0,0])
            condition2 = p1_range_localmaximas["time"].le(p1_range_localminimas.iloc[Pay3_index,0])
            condition = condition1 & condition2
            
            #to find the y value of the 1st local maxima:
            Pay2 = p1_range_localmaximas[condition == True]['high'].values
            if len(Pay2) == 0:
                #proceed with the next pay3 index and rerun the while loop
                Pay3_index_range = Pay3_index_range + 1
                Pay3 = p1_range_localminimas.iloc[Pay3_index_range:,3].min()
                Pay3_index = compute_index_Pa_n(Pay3, p1_range_localminimas)
                continue
            else:
                #if value exists, find the maximum
                Pay2 = max(Pay2)
                if Pay2 < Pay3_high:
                    Pay3_index_range = Pay3_index_range + 1
                    Pay3 = p1_range_localminimas.iloc[Pay3_index_range:,3].min()
                    Pay3_index = compute_index_Pa_n(Pay3, p1_range_localminimas)
                else:
                    break
        except IndexError as err1:
            print("IndexError:", err1)
            break
        except ValueError as ve:
            print("ValueError:", ve)
            break
        except TypeError as te:
            print("TypeError:", te)
            break
    
            
    #Making a new dataframe of the higher highs and higher lows approaching P3 in chronological order
    HH_HL_df = pd.DataFrame(columns=['time','price'])
    HH_HL_df = pd.DataFrame({'time': pd.Series(dtype='datetime64[ns]'), 'price': pd.Series(dtype='float')})
    
    #Grabbing the data attached to Pay*
    mask = Last_110['low'] == Pay1
    pa1 = Last_110[mask].drop(['open','high','close'], axis=1).drop_duplicates(subset=['low'], keep='last').rename(columns={"low":"price"})
    HH_HL_df = pd.concat([HH_HL_df, pa1], ignore_index=True)
    
    Pa2 = p1_range_localmaximas[p1_range_localmaximas['high'] == Pay2].drop(['open','low','close'], axis=1)
    Pa2 = Pa2.drop_duplicates(subset=['high'], keep='last')
    Pa2 = Pa2.rename(columns={"high":"price"})
    HH_HL_df = pd.concat([HH_HL_df, Pa2], ignore_index=True)
    HH_HL_df = pd.concat([HH_HL_df, convert_minima_to_HH_HL_df(Pay3, p1_range_localminimas)], ignore_index=True)
    
    #Resetting the values to avoid issues
    Pay2 = Pay1_index = Pay3_index = 0
    
    #Assigning new values to find new sets of Pa2, Pa3
    Pay1 = Pay3
    Pay1_index = compute_index_Pa_n(Pay1, p1_range_localminimas)
    
    #Resetting the values to avoid issues
    Pay3 = Pay3_index = 0
    
    #Finding the initial value of Pay3
    Pay3 = p1_range_localminimas.iloc[Pay1_index+1:,3].min()
    Pay3_index = compute_index_Pa_n(Pay3, p1_range_localminimas)
    
    #Finding the values of the higher lows and higher highs and appending it to the HH_HL_df
    while True:
        Pay3_index_range = 0
        try:
            # Check if the current Pay3_index is beyond the end of p1_range_localminimas
            if Pay3_index > len(p1_range_localminimas):
                print(f"End of p1_range_localminimas reached.{Pay3_index}")
                break
                
            #finding the index of the next local maxima
            condition1 = p1_range_localmaximas["time"].ge(p1_range_localminimas.iloc[Pay1_index,0])
            condition2 = p1_range_localmaximas["time"].le(p1_range_localminimas.iloc[Pay3_index,0])
            condition = condition1 & condition2
    
            
            #to find the y value of the next local maxima:
            Pay2 = p1_range_localmaximas[condition == True]['high'].values
            
            #previous HH index
            pHH_index = HH_HL_df[HH_HL_df['price'] == Pay1].index[0] - 1
            pHHy = HH_HL_df.iloc[pHH_index,1]
    
            #if Pay2 isnt existing for the current Pay3
            condition3 = len(Pay2) == 0
            
            #if the current Pay2 is less than the previous HH
            try:
                condition4 = p1_range_localmaximas[condition == True]['high'].values.max() < pHHy
            except ValueError as err2:
                Pay3_index = Pay3_index+1
                Pay3 = p1_range_localminimas.iloc[Pay3_index:,3].min()
                Pay3_index = compute_index_Pa_n(Pay3, p1_range_localminimas)
                continue
            if condition3 or condition4:
                #proceed with the next pay3 index and rerun the while loop
                Pay2 = 0
                Pay3_index_range = Pay3_index+1
                Pay3 = p1_range_localminimas.iloc[Pay3_index_range:,3].min()
                Pay3_index = compute_index_Pa_n(Pay3, p1_range_localminimas)
                continue
            else:
                Pay2=Pay2.max()
                #Appending to the HH_HL_df 
                condition5 = (p1_range_localmaximas['high'] == Pay2) & condition
                HH_HL_df = pd.concat([HH_HL_df, convert_maxima_to_HH_HL_df(Pay2, condition5, p1_range_localmaximas)], ignore_index=True)
                HH_HL_df = pd.concat([HH_HL_df, convert_minima_to_HH_HL_df(Pay3, p1_range_localminimas)], ignore_index=True)
                
                #Assigning new values to find new sets of Pa2, Pa3
                Pay1 = Pay3
                Pay1_index = compute_index_Pa_n(Pay1, p1_range_localminimas)
                
                #Finding the initial value of Pay3
                Pay3 = p1_range_localminimas.iloc[Pay1_index+1:,3].min()
                Pay3_index = compute_index_Pa_n(Pay3, p1_range_localminimas)
                continue
        except IndexError as err1:
            break
    
    
    #Append Max_110 to the dataframe:
    Max_110 = Max_110.drop(['open','low','close'], axis=1).rename(columns={"high":"price"})
    HH_HL_df = pd.concat([HH_HL_df, Max_110], ignore_index=True)
    
    # Create a tuple combining the list of possible p1-p2-p3 (initial values)
    p1_p2_p3i = create_tuples_p1_p3(HH_HL_df)
    
    # finding the right p1-p2-p3 in terms of ratios
    p1_p2_p3_index = []
    for i in p1_p2_p3i:
        p3i=i[0]
        p2i=i[1]
        p1i=i[2]
        p3y_p2y = HH_HL_df.iloc[p3i,1] - HH_HL_df.iloc[p2i,1]
        p2y_p1y = HH_HL_df.iloc[p2i,1] - HH_HL_df.iloc[p1i,1]
        p3x_p2x = HH_HL_df.iloc[p3i,0] - HH_HL_df.iloc[p2i,0]
        p2x_p1x = HH_HL_df.iloc[p2i,0] - HH_HL_df.iloc[p1i,0]
    
        # conditions for finding the right p1-p2-p3
        condition1 = 1.027 < abs(p3y_p2y/p2y_p1y) < 1.9286
        try:
            condition2 = 0.333 < p3x_p2x/p2x_p1x < 5
        except ZeroDivisionError as err1:
            condition2 = False
        condition3 = p2x_p1x > pd.Timedelta(minutes=1)
        condition4 = pd.Timedelta(minutes=2) <= p2x_p1x <= pd.Timedelta(minutes=24)
        condition5 = pd.Timedelta(minutes=2) <= p3x_p2x <= pd.Timedelta(minutes=21)
        condition = condition1 & condition2 & condition3 & condition4 & condition5
        
        if condition:
            p1_p2_p3_index.append(i)
    
    p1 = pd.DataFrame(columns=['time','price'])
    p1 = pd.DataFrame({'time': pd.Series(dtype='datetime64[ns]'), 'price': pd.Series(dtype='float')})
    p2 = pd.DataFrame(columns=['time','price'])
    p2 = pd.DataFrame({'time': pd.Series(dtype='datetime64[ns]'), 'price': pd.Series(dtype='float')})
    p3 = pd.DataFrame(columns=['time','price'])
    p3 = pd.DataFrame({'time': pd.Series(dtype='datetime64[ns]'), 'price': pd.Series(dtype='float')})
    
    if len(p1_p2_p3_index) == 0:
        print("There are no P1-P2-P3 yet. Let's keep waiting...")
        return p1_range_localminimas, p1_range_localmaximas, HH_HL_df, p1, p2, p3
    else:
        for p3i, p2i, p1i in p1_p2_p3_index:
            new_p3 = HH_HL_df.iloc[[p3i]]
            new_p2 = HH_HL_df.iloc[[p2i]]
            new_p1 = HH_HL_df.iloc[[p1i]]
            p3 = pd.concat([p3,new_p3], ignore_index=True)
            p2 = pd.concat([p2,new_p2], ignore_index=True)
            p1 = pd.concat([p1,new_p1], ignore_index=True)
            return  p1_range_localminimas, p1_range_localmaximas, HH_HL_df, p1, p2, p3
    
    if len(p1) != len(p2):
        print("Index error! P1 and P2 doesn't match!")
        return p1_range_localminimas, p1_range_localmaximas, HH_HL_df, p1, p2, p3

### FIND THE P1_internal_range

In [22]:
p2["time"].iloc[-1]

Timestamp('2025-01-10 14:55:00')

In [17]:
p1_internal_range_start_time = p2["time"].iloc[-1]
p1_internal_range_end_time = p3["time"].iloc[-1]

print(p1_internal_range_start_time)
print(p1_internal_range_end_time)
mask = (p1_internal_range_start_time <= Last_110["time"]) & (Last_110["time"] <= p1_internal_range_end_time)
p1_internal_range = Last_110[mask]
print(p1_internal_range)

2025-01-10 14:55:00
2025-01-10 15:03:00
                    time     open     high      low    close
2814 2025-01-10 14:55:00  1.23143  1.23155  1.23137  1.23147
2815 2025-01-10 14:56:00  1.23147  1.23157  1.23147  1.23157
2816 2025-01-10 14:57:00  1.23157  1.23171  1.23156  1.23161
2817 2025-01-10 14:58:00  1.23159  1.23168  1.23159  1.23163
2818 2025-01-10 14:59:00  1.23161  1.23165  1.23142  1.23157
2819 2025-01-10 15:00:00  1.23159  1.23174  1.23155  1.23157
2820 2025-01-10 15:01:00  1.23157  1.23177  1.23148  1.23177
2821 2025-01-10 15:02:00  1.23176  1.23203  1.23175  1.23202
2822 2025-01-10 15:03:00  1.23204  1.23215  1.23200  1.23215


In [18]:
p1_range_localminimas_internal, p1_range_localmaximas_internal, HH_HL_df_internal, p1_internal, p2_internal, p3_internal = obtain_p1_p2_p3(p1_internal_range)

There are no P1-P2-P3 yet. Let's keep waiting...


In [19]:
HH_HL_df_internal

Unnamed: 0,time,price
0,2025-01-10 14:55:00,1.23137
1,2025-01-10 14:57:00,1.23171
2,2025-01-10 14:59:00,1.23142
3,2025-01-10 15:03:00,1.23215


### FIND P4 RANGE

In [None]:
# p1_range_start_time = Min_110["time"].iloc[-1]
# p1_range_end_time = Max_110['time'].iloc[-1]

p4_range_start_time = p3["time"].iloc[-1]
p4_range_end_time = Last_110["time"].iloc[-1]
print(p4_range_start_time)
print(p4_range_end_time)
mask = (p4_range_start_time <= Last_110["time"]) & (Last_110["time"] <= p4_range_end_time)
p4_range = Last_110[mask]
print(p4_range)

In [None]:
#FIND THE INITIAL VALUE OF P4 ( MINIMUM LOW IN THE P4 RANGE)
p4 = p4_range["low"].min()

mask = p4_range["low"] == p4
p4 = p4_range[mask].drop(['open','high','close'], axis=1).rename(columns={"low":"price"})
p4