In [1]:
import numpy as np
import pandas as pd

In [2]:
file_path = '../bar_movement/data/'
currency_pair = 'Eur_Usd'

In [3]:
df = pd.read_csv(file_path + f'Oanda_{currency_pair}_H4_2022-2023.csv')
df.Date = pd.to_datetime(df.Date, utc=True)
df.reset_index(drop=True, inplace=True)

In [16]:
def atr(high, low, close, lookback=14):
    high_low = high - low
    high_close = np.abs(high - close.shift())
    low_close = np.abs(low - close.shift())
    ranges = pd.concat([high_low, high_close, low_close], axis=1)
    true_range = np.max(ranges, axis=1)

    return true_range.rolling(lookback).mean()

def wma(arr, period=20):
    kernel = np.arange(period, 0, -1)
    kernel = np.concatenate([np.zeros(period - 1), kernel / kernel.sum()])
    return np.convolve(arr, kernel, 'same')

def hma(close, length=20):
    wma1 = wma(close, int(length / 2))
    wma2 = wma(close, length)
    hullma = wma(2 * wma1 - wma2, int(length ** 0.5))

    return hullma

def calculate_ssl(close, high, low, ssl_len=10):
    sma_high = high.rolling(window=ssl_len).mean()
    sma_low = low.rolling(window=ssl_len).mean()

    hlv = pd.Series(index=close.index, dtype=float)
    hlv[0] = 0.0

    for i in range(1, len(close)):
        hlv[i] = 1 if close[i] > sma_high[i] else (-1 if close[i] < sma_low[i] else hlv[i-1])

    ssl_down = pd.Series(index=close.index, dtype=float)
    ssl_up = pd.Series(index=close.index, dtype=float)

    for i in range(len(close)):
        ssl_down[i] = sma_high[i] if hlv[i] < 0 else sma_low[i]
        ssl_up[i] = sma_low[i] if hlv[i] < 0 else sma_high[i]

    return ssl_down, ssl_up

def absolute_strength(close, length=6, smooth=3):
    # Function to calculate moving average
    def f(x, length):
        return x.rolling(window=length).mean()

    # Price calculation
    Price1 = f(close, 1)
    Price2 = f(close.shift(1), 1)

    Bulls = 0.5 * (np.abs(Price1 - Price2) + (Price1 - Price2))
    Bears = 0.5 * (np.abs(Price1 - Price2) - (Price1 - Price2))

    AvgBulls = f(Bulls, length)
    AvgBears = f(Bears, length)

    SmthBulls = f(AvgBulls, smooth)
    SmthBears = f(AvgBears, smooth)

    return SmthBulls, SmthBears

def tdf(close, lookback=13, n_length=3, filter_low=-0.05, filter_high=0.05):
    mma = pd.Series.ewm(close * 1000, span=lookback).mean()
    smma = pd.Series.ewm(mma, span=lookback).mean()

    impetmma = mma - mma.shift(1)
    impetsmma = smma - smma.shift(1)
    divma = np.abs(mma - smma)
    averimpet = (impetmma + impetsmma) / 2

    # Calculate tdf and ntdf
    tdf = np.power(divma, 1) * np.power(averimpet, n_length)
    ntdf = tdf / np.abs(tdf).rolling(window=lookback * n_length).max()
    

    return ntdf, ntdf < filter_low, ntdf > filter_high

def rvi(opens, high, low, close, length=4):
    def swma(series):
        weights = np.array([1/6, 2/6, 2/6, 1/6])

        return pd.Series(np.convolve(series, weights[::-1], 'full'))

    rvi = swma(close - opens).rolling(window=length).sum() / swma(high - low).rolling(window=length).sum()
    sig = swma(rvi)

    return rvi, sig

def super_ichi(high, low, close, tenkan_len=9, tenkan_mult=2.0, kijun_len=21, kijun_mult=4.0, spanB_len=52, spanB_mult=6.0, offset=25):
    def avg(length, mult):
        curr_atr = atr(high, low, close, length) * mult
        hl2 = (high + low) / 2
        up = hl2 + curr_atr
        dn = hl2 - curr_atr
        upper, lower = np.zeros_like(close), np.zeros_like(close)

        for i in range(1, len(close)):
            upper[i] = min(up[i], upper[i - 1]) if close[i - 1] < upper[i - 1] else up[i]
            lower[i] = max(dn[i], lower[i - 1]) if close[i - 1] > lower[i - 1] else dn[i]

        os, max_val, min_val = np.zeros_like(close), np.zeros_like(close), np.zeros_like(close)

        def _cross(i, spt):
            return ((close[i - 1] < spt) and (close[i] >= spt)) or ((close[i - 1] > spt) and (close[i] <= spt))

        for i in range(1, len(close)):
            os[i] = 1 if close[i] > upper[i] else (0 if close[i] < lower[i] else os[i - 1])
            spt = lower[i] if os[i] == 1 else upper[i]
            max_val[i] = max(close[i], max_val[i - 1]) if _cross(i, spt) else (max(max_val[i - 1], close[i]) if os[i] == 1 else spt)
            min_val[i] = min(close[i], min_val[i - 1]) if _cross(i, spt) else (min(min_val[i - 1], close[i]) if os[i] == 0 else spt)

        return (max_val + min_val) / 2

    # Calculate tenkan, kijun, senkouA, and senkouB
    tenkan = avg(tenkan_len, tenkan_mult)
    kijun = avg(kijun_len, kijun_mult)
    senkouA = (kijun + tenkan) / 2
    senkouB = avg(spanB_len, spanB_mult)

    return pd.Series(senkouA).shift(offset), pd.Series(senkouB).shift(offset), kijun

def chaikin_volatility(high, low, length=10, roc_length=12):
    diff = high - low
    diff_ema = pd.Series.ewm(diff, span=length).mean()
    roc_ema = diff_ema.pct_change(roc_length) * 100

    return roc_ema

In [17]:
df['atr'] = atr(df['Mid_High'], df['Mid_Low'], df['Mid_Close'])
df['abs_strength_bull'], df['abs_strength_bear'] = absolute_strength(df['Mid_Close'])
df['senkouA'], df['senkouB'], df['kijun'] = super_ichi(df['Mid_High'], df['Mid_Low'], df['Mid_Close'])
df['chaikin_volatility'] = chaikin_volatility(df['Mid_High'], df['Mid_Low'])

In [19]:
df[['Date', 'senkouA', 'senkouB']].tail(20)

Unnamed: 0,Date,senkouA,senkouB
1542,2023-08-28 01:00:00+00:00,1.089822,1.094044
1543,2023-08-28 05:00:00+00:00,1.089822,1.094044
1544,2023-08-28 09:00:00+00:00,1.089822,1.094044
1545,2023-08-28 13:00:00+00:00,1.089822,1.094044
1546,2023-08-28 17:00:00+00:00,1.088903,1.093284
1547,2023-08-28 21:00:00+00:00,1.088504,1.092925
1548,2023-08-29 01:00:00+00:00,1.088149,1.092665
1549,2023-08-29 05:00:00+00:00,1.088086,1.092651
1550,2023-08-29 09:00:00+00:00,1.086631,1.090863
1551,2023-08-29 13:00:00+00:00,1.084445,1.088514
