In [None]:
import numpy as np
import pandas as pd
from scipy.signal import argrelextrema

In [None]:
# Define colors
bull_css = '#169400'
bull_avg_css = '#9598a1'
bear_css = '#ff1100'
bear_avg_css = '#9598a1'

# Define line styles
line_styles = {'⎯⎯⎯': '-', '----': '--', '····': ':'}

# Define input parameters
length = 5
bull_ext_last = 3
bear_ext_last = 3
line_style = '⎯⎯⎯'
line_width = 1
mitigation = "Wick"#'Close' #['Wick', 'Close']

In [None]:
# csv_path = "data/GBPUSD_3Y_H1_OHLCV.csv"
csv_path = "data/TV_OBD_Export_GBPUSD_h1.csv"
date_parser = lambda x: pd.to_datetime(x, unit='s')
# df = pd.read_csv(csv_path, parse_dates=['time'], date_parser=date_parser)
df = pd.read_csv(csv_path, parse_dates=['time'], usecols= ['time', 'open', 'high', 'low', 'close', 'Volume'],date_parser=date_parser)
data = df.copy()
data['hl2'] = (data['high'] + data['low']) / 2
# data = data[data['time'].dt.dayofweek < 5]
# data.reset_index(drop = True, inplace = True)

In [None]:
# Helper function to convert datetime to UNIX timestamp
def convert_to_unix(df, column):
    df[column] = (df[column] - pd.Timestamp("1970-01-01")) // pd.Timedelta('1s')
    return df[column]

In [None]:
# data['time'] = pd.to_datetime(data['time'])
data['time'] = convert_to_unix(data, column = 'time')
data

In [None]:
def np_shift(a:np.ndarray, shift_value:int, axis=0, fill_value=np.NaN) -> np.ndarray:
    if shift_value == 0:
        return a
    
    if not np.issubdtype(a.dtype, np.floating):
        a = a.astype(np.float64)
    
    result = np.roll(a=a, shift=shift_value, axis=axis)
    axes = [slice(None)] * a.ndim
    if shift_value > 0:
        axes[axis] = slice(None, shift_value)
    else:
        axes[axis] = slice(shift_value, None)

    result[tuple(axes)] = fill_value

    return result

def pivothigh(data, left_length, right_length):
    """
    Find pivot highs in a numpy array.

    Parameters:
    data (numpy.array): Array containing the data
    left_bars (int): Number of bars to the left of the pivot
    right_bars (int): Number of bars to the right of the pivot

    Returns:
    numpy.array: An array where each pivot high is marked with the value of the pivot, and non-pivots are np.nan
    """
    # Use scipy's argrelextrema function to find the indices of relative highs
    pivot_indices = argrelextrema(data, np.greater_equal, order=max(left_length, right_length))

    # Create an array of np.nan
    pivot_array = np.full(data.shape, np.nan)

    # Set the values at the pivot indices to the values from the data array
    pivot_array[pivot_indices] = data[pivot_indices]

    if left_length == right_length:
        final_array = np_shift(pivot_array, left_length, fill_value= 0)
    else:
        final_array = pivot_array
    return final_array

# Example usage:
volume_test_data = np.array([100.0, 259.0, 368.0, 249.1, 79.2, 212.0, 390.0, 212.1, 105.0])  # Replace this with your volume data

# phv = pivothigh(volume_test_data, left_length = 2, right_length = 2)
phv = pivothigh(volume_test_data, 2, 2)
print(phv)

In [None]:
s = pd.Series([1, 2, 3, 4, 5])
shifted_s = s.shift(2)
shifted_s

In [None]:
s = np.array([1, 2, 3, 4, 5])
np_shift(s,2)

In [None]:
# Function to get coordinates
# def get_coordinates(condition, top, btm, ob_val, time_stamps):
#     ob_top  = []
#     ob_btm  = []
#     ob_avg  = []
#     ob_left = []

#     ob = None
#     # Append coordinates to lists
#     if condition:
#         avg = (top + btm) / 2.0
        
#         ob_top.insert(0, top)
#         ob_btm.insert(0, btm)
#         ob_avg.insert(0, avg)
#         ob_left.insert(0, time_stamps)
        
#         ob = ob_val
    
#     return ob_top, ob_btm, ob_avg, ob_left, ob

In [None]:
def remove_mitigated(ob_top, ob_btm, ob_left, ob_avg, target, bull):
    mitigated = False
    target_array = ob_btm if bull else ob_top

    for element in target_array:
        idx = target_array.index(element)

        if ((bull and (target < element).any()) or (not bull and (target > element).any())):  # or .all(), depending on your needs
            mitigated = True

            del ob_top[idx]
            del ob_btm[idx]
            del ob_avg[idx]
            del ob_left[idx]

    return mitigated

In [None]:
# Main logic
data['upper'] = data['high'].rolling(window=length).max()
data['lower'] = data['low'].rolling(window=length).min()

if mitigation == 'Close':
    data['target_bull'] = data['close'].rolling(window=length).min()
    data['target_bear'] = data['close'].rolling(window=length).max()
else:
    data['target_bull'] = data['lower']
    data['target_bear'] = data['upper']

data['os'] = np.where(data['high'].shift(length) > data['upper'], 0, np.where(data['low'].shift(length) < data['lower'], 1, np.nan))
data['os'].ffill(inplace=True)

In [None]:
# data.loc[:,'phv'] = data['Volume'].rolling(window=length * 2 + 1, center=True).max() == data['Volume']
data['phv'] = pivothigh(data['Volume'].to_numpy(), left_length = length, right_length = length)
# data.iloc[:, -5:].tail() # Last 5 cols
data.tail(10)

In [None]:
data[~data['phv'].isna()]

In [None]:
data.info()

In [None]:
# data.dropna(subset = ['os'],inplace = True)
# data['os'] = data['os'].astype(int)
# data.reset_index(drop = True, inplace = True)

In [None]:
# data = data.iloc[:100]
# data

In [None]:
# def get_coordinates(row, length):
#     # import pdb; pdb.set_trace()
#     condition = (row['phv'] > 1) and (row['os'] == 1)
#     top = row['hl2']
#     btm = ob_val = row['low']
#     time_stamps = row['time']
#     ob_top  = []
#     ob_btm  = []
#     ob_avg  = []
#     ob_left = []

#     ob = None
#     # Append coordinates to lists
#     if condition:
#         avg = (top + btm) / 2.0
        
#         ob_top.insert(0, top)
#         ob_btm.insert(0, btm)
#         ob_avg.insert(0, avg)
#         ob_left.insert(0, time_stamps)
#         ob = ob_val
    
#     return ob_top, ob_btm, ob_avg, ob_left, ob

# def get_coordinates(row, length, col_names):
#     condition = (row['phv'] > 1) and (row['os'] == 1)
#     top = row['hl2']
#     btm = ob_val = row['low']
#     time_stamps = row['time']

#     ob_top = ob_btm = ob_avg = ob_left = ob = None

#     # Assign values to variables if condition is met
#     if condition:
#         avg = (top + btm) / 2.0
        
#         ob_top = top
#         ob_btm = btm
#         ob_avg = avg
#         ob_left = time_stamps
#         ob = ob_val
    
#     return pd.Series([ob_top, ob_btm, ob_avg, ob_left, ob], index=col_names)

def get_bullish_coordinates(row, length):
    bull_top = bull_btm = bull_avg = bull_left = bull_ob = np.nan

    # Compute bullish coordinates
    if (row['phv'] != None) and row['os'] == 1:
        avg = (row['hl2'] + row['low']) / 2

        bull_top = row['hl2']
        bull_btm = row['low']
        bull_avg = avg
        bull_left = row['time']
        bull_ob = row['low']

    return pd.Series([bull_top, bull_btm, bull_avg, bull_left, bull_ob])

def get_bearish_coordinates(row, length):
    bear_top = bear_btm = bear_avg = bear_left = bear_ob = np.nan

    # Compute bearish coordinates
    if (row['phv'] != None) and row['os'] == 0:
        avg = (row['high'] + row['hl2']) / 2

        bear_top = row['high']
        bear_btm = row['hl2']
        bear_avg = avg
        bear_left = row['time']
        bear_ob = row['high']

    return pd.Series([bear_top, bear_btm, bear_avg, bear_left, bear_ob])



# Apply the function to each row
# results = data.shift(length).apply(get_coordinates, axis=1, args=(length,))

# Apply the function to each row
# results = data.apply(get_coordinates, axis=1, args=(length,))

In [None]:
# results = pd.DataFrame(results.tolist(), columns=['bull_top', 'bull_btm', 'bull_avg', 'bull_left', 'bull_ob'])
# Apply the function to each row
# data[['bull_top', 'bull_btm', 'bull_avg', 'bull_left', 'bull_ob']] = data.apply(get_bullish_coordinates, axis=1, args=(length,))
# data[['bear_top', 'bear_btm', 'bear_avg', 'bear_left', 'bear_ob']] = data.apply(get_bearish_coordinates, axis=1, args=(length,))
# data.to_csv("data/GU_TV_Export_FE.csv", index = False)
# data

In [None]:
def get_bullish_coordinates(row):
    bull_top = bull_btm = bull_avg = bull_left = bull_ob = np.nan

    # Compute bullish coordinates
    if row['phv'] != None and row['os'] == 1:
        avg = (row['hl2_shifted'] + row['low_shifted']) / 2

        bull_top = row['hl2_shifted']
        bull_btm = row['low_shifted']
        bull_avg = avg
        bull_left = row['time_shifted']
        bull_ob = row['low_shifted']

    return pd.Series([bull_top, bull_btm, bull_avg, bull_left, bull_ob])

def get_bearish_coordinates(row):
    bear_top = bear_btm = bear_avg = bear_left = bear_ob = np.nan

    # Compute bearish coordinates
    if row['phv'] != None and row['os'] == 0:
        avg = (row['high_shifted'] + row['hl2_shifted']) / 2

        bear_top = row['high_shifted']
        bear_btm = row['hl2_shifted']
        bear_avg = avg
        bear_left = row['time_shifted']
        bear_ob = row['high_shifted']

    return pd.Series([bear_top, bear_btm, bear_avg, bear_left, bear_ob])



In [None]:
data['hl2_shifted'] = data['hl2'].shift(length)
data['low_shifted'] = data['low'].shift(length)
data['high_shifted'] = data['high'].shift(length)
data['time_shifted'] = data['time'].shift(length)

In [None]:
data[['bull_top', 'bull_btm', 'bull_avg', 'bull_left', 'bull_ob']] = data.apply(get_bullish_coordinates, axis=1)
data[['bear_top', 'bear_btm', 'bear_avg', 'bear_left', 'bear_ob']] = data.apply(get_bearish_coordinates, axis=1)
data.to_csv("data/GU_TV_data_Python_FeatureEngineered.csv", index = False)
data

In [None]:
data[~data["bull_ob"].isna()]

In [None]:
data[~data["bear_ob"].isna()]

In [None]:
data.tail(10)