In [19]:
import numpy as np
import pandas as pd
import yfinance as yf
from stable_baselines3 import PPO
from enum import Enum
from sklearn.preprocessing import MinMaxScaler
from finta import TA

In [20]:
model = PPO.load('saved_models/best_model_800000_LSTM4.zip')

In [21]:
class Positions(int, Enum):
    SHORT = 0
    LONG = 1
    HOLD = 2
    TAKE = 3

In [22]:
def getTickerData(ticker, period, interval):
    hist = yf.download(tickers=ticker, period=period, interval=interval)
    df = pd.DataFrame(hist)
    df = df.reset_index()
    return df

In [23]:
def calculate_percentage_increase(final_value, starting_value):
    try:
        return 100 * ((final_value - starting_value) / starting_value)
    except:
        print(final_value, starting_value)

In [24]:
def supertrend_indicator(df, atr_period, multiplier):
    
    high = df['high']
    low = df['low']
    close = df['close']
    
    price_diffs = [high - low, 
                   high - close.shift(), 
                   close.shift() - low]
    true_range = pd.concat(price_diffs, axis=1)
    true_range = true_range.abs().max(axis=1)
    atr = true_range.ewm(alpha=1/atr_period,min_periods=atr_period).mean() 
    hl2 = (high + low) / 2
    final_upperband = upperband = hl2 + (multiplier * atr)
    final_lowerband = lowerband = hl2 - (multiplier * atr)
    
    supertrend = [True] * len(df)
    
    for i in range(1, len(df.index)):
        curr, prev = i, i-1
        
        if close[curr] > final_upperband[prev]:
            supertrend[curr] = True
        elif close[curr] < final_lowerband[prev]:
            supertrend[curr] = False
        else:
            supertrend[curr] = supertrend[prev]

            if supertrend[curr] == True and final_lowerband[curr] < final_lowerband[prev]:
                final_lowerband[curr] = final_lowerband[prev]
            if supertrend[curr] == False and final_upperband[curr] > final_upperband[prev]:
                final_upperband[curr] = final_upperband[prev]

        if supertrend[curr] == True:
            final_upperband[curr] = np.nan
        else:
            final_lowerband[curr] = np.nan
    
    return final_lowerband, final_upperband

In [25]:
def rsi_indicator(df):
    rsi = TA.RSI(df[['open', 'high', 'low', 'close']], 14)

    signals = []
    for i in range(0, len(rsi)):
        if rsi[i] > 60: # Default value: 70
            signals.append(Positions.SHORT)
        elif rsi[i] < 40: # Default value: 30
            signals.append(Positions.LONG)
        else:
            signals.append(Positions.HOLD)
        
    buy_signal = [True if signals[n]==1 else False for n in range(0, len(signals))]
    sell_signal = [True if signals[n]==-1 else False for n in range(0, len(signals))]
    
    return signals, buy_signal, sell_signal

In [26]:
def swing_detection(index, df):
    sh = []
    sl = []
    start = (index*2) - 1
    for i in range(index-1):
        sh.append(False)
        sl.append(False)
    for ci, row in df.iterrows():
        
        swing_high = False
        swing_low = False
        
        if ci < start:
            continue
        
        swing_point_high = df['high'][ci - index]
        swing_point_low = df['low'][ci - index]
        
        for i in range(0, start):
            swing_high = True
            if i < index:
                if df['high'][ci - i] > swing_point_high:
                    swing_high = False
                    break
            if i > index:
                if df['high'][ci - i] >= swing_point_high:
                    swing_high = False
                    break
            
        for i in range(0, start):
            swing_low = True
            if i < index:
                if df.low[ci - i] < swing_point_low: 
                    swing_low = False
                    break  
            if i > index:
                if df.low[ci - i] <= swing_point_low: 
                    swing_low = False
                    break 
            
        sh.append(swing_high)
        sl.append(swing_low)
        
    for i in range(index):
        sh.append(False)
        sl.append(False)
        
    current_sh = 0
    current_sl = 0
    sh_nums = []
    sl_nums = []
    for i, row in df.iterrows():
        if sh[i] == True:
            current_sh = df.high[i]
        if sl[i] == True:
            current_sl = df.low[i]
        sh_nums.append(current_sh)
        sl_nums.append(current_sl)
    return sh, sl, sh_nums, sl_nums

In [27]:
def money_flow_index_indicator(df, period=14):
    # Calculate typical price (TP) for each period
    df['TP'] = (df['high'] + df['low'] + df['close']) / 3

    # Calculate raw money flow (RMF) for each period
    df['RMF'] = df['TP'] * df['volume']

    # Calculate positive and negative money flow
    df['PMF'] = 0.0
    df['NMF'] = 0.0

    for i in range(1, len(df)):
        if df.at[i, 'TP'] > df.at[i - 1, 'TP']:
            df.at[i, 'PMF'] = df.at[i, 'TP'] * df.at[i, 'volume']
        elif df.at[i, 'TP'] < df.at[i - 1, 'TP']:
            df.at[i, 'NMF'] = df.at[i, 'TP'] * df.at[i, 'volume']

    # Calculate money flow ratio (MFR)
    df['MFR'] = df['PMF'].rolling(window=period).sum() / df['NMF'].rolling(window=period).sum()

    # Calculate Money Flow Index (MFI)
    df['MFI'] = 100 - (100 / (1 + df['MFR']))

    # Remove temporary columns
    df.drop(['TP', 'RMF', 'PMF', 'NMF', 'MFR'], axis=1, inplace=True)

    return df['MFI'].values

In [28]:
def produce_prediction(df, window):  
    prediction = (df.shift(window)['close'] <= df['close'])
    
    return prediction.astype(int)

In [29]:
def preprocess_data(df):
    scaler = MinMaxScaler()
    
    mfi_indicator = money_flow_index_indicator(df)
    close_b = produce_prediction(df, 1)
    rsi_signals, _, _ = rsi_indicator(df)
    sh, sl, sh_nums, sl_nums = swing_detection(5, df)    
    final_lowerband, final_upperband = supertrend_indicator(df, 10, 3)
    fu_modified = [Positions.LONG if not np.isnan(lowerband) else Positions.SHORT for lowerband in final_lowerband]

    df['close_binary'] = close_b
    df['mfi'] = mfi_indicator
    df['sh_nums'] = sh_nums
    df['sl_nums'] = sl_nums
    df['supertrend'] = fu_modified
    df['rsi_signals'] = rsi_signals
    
    df = df.drop(columns={"volume", "Adj Close", "open", "high", "low", "date", 'MFI'})
    df = df.dropna()
    
    df[['mfi']] = scaler.fit_transform(df[['mfi']])
    
    return df

In [30]:
df = getTickerData('btc-usd', 'max', '1d')
df = df.rename(columns={'Close': 'close', 'Open': 'open', 'High': 'high', 'Low': 'low', 'Volume': 'volume', 'Datetime': 'date', 'Date': 'date'})
df = preprocess_data(df)
df = df[0:df.shape[0]-1]

[*********************100%***********************]  1 of 1 completed


In [31]:
# getting prediction for current market data
prediction = model.predict(df[['mfi', 'close_binary', 'supertrend', 'rsi_signals']].tail(89))[0]
position_to_take = 'hold'
if prediction == Positions.LONG:
    position_to_take = 'BUY'
elif prediction == Positions.SHORT:
    position_to_take = 'SELL'
print('position_to_take:', position_to_take)

position_to_take: BUY
