In [None]:
from trading_ig.config import config
import warnings
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import scale, StandardScaler, OneHotEncoder, LabelEncoder
import tensorflow as tf
from tensorflow.keras.models import load_model
import logging
from trading_ig import (IGService, IGStreamService)
from trading_ig.lightstreamer import Subscription
from pandas import json_normalize
from sqlalchemy import create_engine
import matplotlib.animation as animation
from matplotlib import style
import datetime as dt
from datetime import timedelta
import requests_cache
from model import data_preprocessing, buy_sell_prediction, buy_sell_prediction_model, price_prediction, price_prediction_model
from analysis import pivot_point
from analysis import technical_analysis, indications

from tensorflow.keras.utils import to_categorical


warnings.filterwarnings("ignore")

In [None]:
#Analysis

#Imports all the necessary attributes and columns from the obtained IG data
#stoch period & rsi period set buy default to 5 and 14 respectively but can be redefined when calling the function
def technical_analysis(df, close, high, low, stoch_period = 5, rsi_period = 14): 

    #Runs all the technical analysis calculations with one command
    pivot_point(df, close, high, low)    
    stochastic(df, close, high, low, stoch_period)
    rsi(df, close, rsi_period)
    macd(df, close)

def indications(df):

    #Runs all the indications necessary for the buy & sell functions with one command
    macd_analysis(df)
    rsi_analysis(df)
    stochastic_rsi_analysis(df)
    price_action(df)

    #Dropping all points with null values after running all the necessary indications
    df.dropna(inplace = True)

def pivot_point(df, close, high, low): 

    #Indications were are instrumental to the price prediction ML along side High, Low and Open values from the IG API to determine the Close price
    #Calculating all the necessary pivot point values from dataframe. Formular obtained from investorpedia
    
    P = (close + high + low) / 3
    R1 = (P * 2) - low
    R2 = P + (high - low)
    S1 = (P * 2) - high
    S2 = P - (high - low)
    
    #Creating columns for the pivot point values and adding the to the data
    df['P'] = P
    df['R1'] = R1
    df['R2'] = R2
    df['S1'] = S1
    df['S2'] = S2


def stochastic(df, close, high, low, stoch_period):
    
    #Getting the minimum and maximum low and high values respectively over the declared stoch period
    stoch_low = low.rolling(window = stoch_period).min()
    stoch_high = high.rolling(window = stoch_period).max()


    fast_k = 100 * ((close - stoch_low) / (stoch_high - stoch_low)) #Running the slow stochastic formular to get %K and %D values
    fast_d = fast_k.rolling(window = 3).mean() #Using 3 as the slow period and %D period
    slow_d = fast_d.rolling(window = 3).mean()

    #Adding the calulated %K and %D columns to the data
    df['%K'] = fast_d
    df['%D'] = slow_d

def rsi(df, close, rsi_period):

    change = close.diff(1) #Running close price difference through the data

    #Identifying the gains and losses based on the price difference in the data
    gain = change.mask(change < 0, 0)
    loss = change.mask(change > 0, 0)

    #Running the exponetial mean based on the loss and gain
    average_gain = gain.ewm(com = rsi_period - 1, min_periods = rsi_period).mean()
    average_loss = loss.ewm(com = rsi_period - 1, min_periods = rsi_period).mean()
    rs = abs(average_gain / average_loss)
    rsi = 100 - (100 / (1 + rs))

    #Adding the calulated RSI columns to the data
    df['RSI'] = rsi

def macd(df, close):
    
    #Declaring default MACD parameters
    fast_length = 12
    slow_length = 26
    signal_smoothing = 9

    #Calcualting the fast, slow and smoothing exponential means
    ema1 = close.ewm(span = fast_length, adjust = False).mean()
    ema2 = close.ewm(span = slow_length, adjust = False).mean()
    macd = ema1 - ema2
    ema3 = macd.ewm(span = signal_smoothing, adjust = False).mean()
    #Getting the MACD histogtam values
    macd_histogram = macd - ema3

    #Adding the calulated MACD, MACDS and MACDH columns to the data
    df['MACD'] = macd
    df['MACDS'] = ema3
    df['MACDH'] = macd_histogram
    
def macd_analysis(df):
   
   #Compared the MACD and MACDS movement and crossover to check whether to buy, sell or hold. 
   #2 - Buy
   #1 - Hold
   #0 - Sell 
   # Note: These parameters above are similar in the rsi and stochastic-rsi analysis
    df.loc[((df['MACD'] < df['MACDS'])), 'MADC_Indication'] = 2
    df.loc[((df['MACD'] > df['MACDS'])), 'MADC_Indication'] = 0 
    df['MADC_Indication'].fillna(1, inplace = True)

def rsi_analysis(df):

    #Compared the RSI position to check whether to buy, sell or hold. 
    #Overbought >= 70
    #Oversold <= 30
    df.loc[((df['RSI'] >= 70)), 'RSI_Divagence_Convergence'] = 0
    df.loc[((df['RSI'] <= 30)), 'RSI_Divagence_Convergence'] = 2
    df['RSI_Divagence_Convergence'].fillna(1, inplace = True)

def stochastic_rsi_analysis(df):

    #Compared %K & %D crossover to check whether to buy, sell or hold. Included RSI to cancel out noisy (false) stoch signals
    #Overbought >= 80
    #Oversold <= 20
    df.loc[((df['%K'] > df['%D']) & (df['%K'] >= 80) & (df['RSI'] >= 70) & (df['MACDH'] < 0)), 'SR_Indication'] = 0
    df.loc[((df['%K'] < df['%D']) & (df['%K']) <= 20) & (df['RSI'] <= 30) & (df['MACDH'] > 0), 'SR_Indication'] = 2
    df['SR_Indication'].fillna(1, inplace = True)
    
def price_action(df):
   
    df['Indication'] =  df.loc[:, 'MADC_Indication':].mean(axis = 1).round(3)
    
     #Uses the mean values from the indications to determine general buy, sell and hold regions
    #Think of it as values from the indications from the Saturday Charts
    #Values also used for the general buy sell ML model. Shall explain further when I present the model
    df.loc[((df['Indication'] < 1 )), 'General_Action'] = 'Sell'
    df.loc[((df['Indication'] > 1 )), 'General_Action'] = 'Buy' 
    df.loc[((df['Indication'] == 1 )), 'General_Action'] = 'Hold' 

    #Further using values from the general indications to identify distinctive buy sell points
    #Setting conditions where if the signal changes at least after 3 similar consecutive indications in the past, give that point as either a define buy, sell or hold. 
    #Filters false and erratic signals further
    #Made up the most recent Charts. Also necessary for backtesting
    df.loc[((df['General_Action'] == 'Buy') & (df['Close'] == df['Close'].rolling(15).min())), 'Action_Buy'] = 1
    df.loc[((df['General_Action'] == 'Sell') & (df['Close'] == df['Close'].rolling(15).max())), 'Action_Sell'] = 1
    df.loc[((df['General_Action'] == 'Hold')), 'Action_Hold'] = 1
    df['Action_Buy'].fillna(0, inplace = True)
    df['Action_Sell'].fillna(0, inplace = True)
    df['Action_Hold'].fillna(0, inplace = True)

    #Dropping all columns that are no longer necessary for analysis, backtesting and prediction
    df.drop(['Indication', 'MADC_Indication', 'RSI_Divagence_Convergence', 'SR_Indication'], inplace = True, axis =1)
    
    #Creating column holding values for the final buy sell ML model.
    df.loc[((df['Action_Buy'] == 0 ) & (df['Action_Sell'] == 1 )), 'Distinct_Action'] = 'Sell'
    df.loc[((df['Action_Buy'] == 1 ) & (df['Action_Sell'] == 0 )), 'Distinct_Action'] = 'Buy'
    df.loc[((df['Action_Buy'] == 0 ) & (df['Action_Sell'] == 0 )), 'Distinct_Action'] = 'Hold'

In [None]:
#Model App

#Declare models
buy_sell_prediction_model = load_model("models\model_buy_sell_.h5")
price_prediction_model = load_model("models\model_price_model.h5")

def data_preprocessing(df, close, high, low,):
    
    #Conduct the necessary technical and indication calculations
    technical_analysis(df, close, high, low,)
    indications(df)
    df.dropna(inplace = True)
    return df

def buy_sell_prediction(df, model): #Does all the buy sell indications used by the buy & sell MLs
    #Load preprocessed data
    training_window = 15
    df = df[['Action_Buy', 'Action_Hold', 'Action_Sell', 'Distinct_Action']]
   
    #Define the targeted categories before conversion to a sutable form for the ML model
    ohe = OneHotEncoder(categories = [['Buy', 'Hold', 'Sell']], sparse = False)
    #Define the parameter used for prediction
     X = np.array(df[['Action_Buy', 'Action_Hold', 'Action_Sell']])
    
    #Converting the targets to a form sutable for the ML to interpate diferent categories
    #y = ohe.fit_transform(df[['Distinct_Action']])
    le = LabelEncoder()
    le = le.fit(['Buy', 'Hold', 'Sell'])
    y = le.transform(df[['Distinct_Action']])
    y = to_categorical(y)

    #Scales the data to make it easier for the ML to identify the desired patterns
    X = scale(X)
    scaler = StandardScaler()
    X = scaler.fit_transform(X)

    
    #Feed the features to the general ML model to get the prediction
    results = model.predict(X).round(1)
    #Converting the targets to a form sutable for the ML to interpate diferent categories
    #decoded = ohe.inverse_transform(results)
    decoded =le.inverse_transform(np.argmax(results.round(1), axis = 1))
    return (decoded)#Return the prediction for the most current data point

def price_prediction(df, model):

    #Define the parameter used for prediction
    X = np.array(df[['Open', 'High', 'Low', 'P', 'R1', 'R2', 'S1', 'S2']])
    y = np.array(df[['Close']])
    
     #Scales the data to make it easier for the ML to identify the desired patterns
    X = scale(X)
    scaler = StandardScaler()
    X = scaler.fit_transform(X)
    y = scaler.fit_transform(y.reshape(-1, 1))
    
    #Feed the features to the general ML model to get the prediction
    results = model.predict(X)
    results = scaler.inverse_transform(results)#Convert the prediction array back to the respective price value
    return (results.round(2)) #Return current price prediction and round off to 2 decimal places


In [None]:
#Old Streaming App

#Declare the db
db_engine = create_engine(r'sqlite:///streaming_db.db', echo = False)

def streaming_func(df, engine = db_engine):
     
        try:
            #Chceking if db exists. If so merge with current data set. If not, proceed
            past_data = pd.read_sql("select * from streaming_data;", con = engine, index_col = 'DateTime')
            df = pd.concat([past_data, df], axis = 0)
            
            #Ensure ther index is sorted and has no duplicate streamed values
            df.sort_index(axis = 0, ascending = True, inplace = True)
            df = df.loc[~df.index.duplicated(keep='last')]
        except:
            pass


        if df.shape[0] > 15: #Check if merged datset has enough values for preprocessing and predictions. Min 30 records
            
            #Ensure the number of historical values is set to 200 set max for predictions
            prediction_df = df.iloc[-200:].copy()
            prediction_df = data_preprocessing(prediction_df, prediction_df['Close'], 
                                               prediction_df['High'], prediction_df['Low'])
            #Predict the trade action
            predicted_buy_or_sell = buy_sell_prediction(prediction_df, buy_sell_prediction_model) 
            #Predict the possible close price          
            predicted_price = price_prediction(prediction_df, price_prediction_model)
            #Display prediction
            print (f'Best Trading Action: {predicted_buy_or_sell[-1]}')
            #Added Recommended action from analysis in the app just a confirmation the model is doing the right thing
            print (f'Recommended Action: {prediction_df.Distinct_Action.iloc[-1]}')
            print (f'Possible Next Candle Closing Price: {predicted_price[-1]}')


        # Set saved data limit and append the current record to the db table. 
        # If the table structure has changed, replace the table
        df = df.iloc[-200:]
        try:
            df.to_sql('streaming_data', con = engine, if_exists = 'append')
        except:
            df.to_sql('streaming_data', con = engine, if_exists = 'replace')

        print (df[['UTM', 'Close', 'High', 'Low']].iloc[-1]) #Return most recent streamed values
    
def on_prices_update(item_update):
    
    #If candlestick is at the end of the interval, print update
    if int(item_update["values"]["CONS_END"]) == 0:

        #Create a database to hold historical streaming data
        #Convert received data set from lightstream from json to  dataframe for processing
        df = json_normalize(item_update['values'])
        
        #Create datetime column and index current datetime the update was made
        df['DateTime'] = dt.datetime.now().strftime("%d/%m/%Y, %H:%M:%S")
        df.set_index("DateTime", inplace = True)#Setting time column as the index
        df = df.rename(columns = {'OFR_OPEN':'Open', 'OFR_HIGH':'High', 'OFR_LOW':'Low', 'OFR_CLOSE':'Close'})

        #Ensure all values received are numerical for calculations
        df[['Open', 'High', 'Low', 'Close']] = df[['Open', 'High', 'Low', 'Close']].apply(pd.to_numeric)
        
        #Run the streaming functionto preprocess and predict the streamed data
        streaming_func(df)
            
    
def Old_streaming_main():
    
    logging.basicConfig(level=logging.INFO)
    # logging.basicConfig(level=logging.DEBUG)

    ig_service = IGService(config.username, config.password, config.api_key, config.acc_type)

    ig_stream_service = IGStreamService(ig_service)
    ig_session = ig_stream_service.create_session()
    # Ensure configured account is selected
    accounts = ig_session[u'accounts']
    for account in accounts:
        if account[u'accountId'] == config.acc_number:
            accountId = account[u'accountId']
            break
        else:
            print('Account not found: {0}'.format(config.acc_number))
            accountId = None
    ig_stream_service.connect(accountId)

    # Making a new Subscription in MERGE
    # https://labs.ig.com/streaming-api-reference
    subscription_prices = Subscription(mode="MERGE", items=['CHART:CC.D.NG.USS.IP:5MINUTE'],
                                       fields=["UTM" , "OFR_OPEN", "OFR_HIGH", "OFR_LOW", "OFR_CLOSE", "CONS_END"],)
    subscription_prices.addlistener(on_prices_update)

    # Registering the Subscription
    sub_key_prices = ig_stream_service.ls_client.subscribe(subscription_prices)

    input("{0:-^80}\n".format("HIT CR TO UNSUBSCRIBE AND DISCONNECT FROM \
    LIGHTSTREAMER"))

    ig_stream_service.disconnect()
    

In [None]:
#Runs streaming app
Old_streaming_main()

In [None]:
#Streaming App

#Declare the db
db_engine = create_engine(r'sqlite://', echo = False)
    
def streaming_func(df, engine = db_engine):

    #Ensure the number of historical values is set to 200 set max for predictions
    prediction_df = df.iloc[-200:].copy()
    prediction_df = data_preprocessing(prediction_df, prediction_df['Close'], 
                                        prediction_df['High'], prediction_df['Low'])
    #Predict the trade action
    predicted_buy_or_sell = buy_sell_prediction(prediction_df, buy_sell_prediction_model) 
    #Predict the possible close price          
    predicted_price = price_prediction(prediction_df, price_prediction_model)
    #Display prediction
    print (f'Best Trading Action: {predicted_buy_or_sell[-1]}')
    #Added Recommended action from analysis in the app just a confirmation the model is doing the right thing
    print (f'Recommended Action: {prediction_df.Distinct_Action.iloc[-1]}')
    print (f'Possible Next Candle Closing Price: {predicted_price[-1]}')

    # Set saved data limit and append the current record to the db table. 
    # If the table structure has changed, replace the table
    df = df.iloc[-200:]
    try:
        df.to_sql('streaming_data', con = engine, if_exists = 'append')
    except:
        df.to_sql('streaming_data', con = engine, if_exists = 'replace')

    print (df[['UTM', 'Close', 'High', 'Low']].iloc[-1]) #Return most recent streamed values
    
def db_func(df, engine = db_engine):
    
    try:
        #Chceking if db exists. If so merge with current data set. If not, proceed
        past_data = pd.read_sql("select * from streaming_data;", con = engine, index_col = 'DateTime')
        df = pd.concat([past_data, df], axis = 0)
        df.sort_index(axis = 0, ascending = True, inplace = True)
        df = df.loc[~df.index.duplicated(keep='last')]
    except:
        pass
    
    #Check if merged datset has enough values for preprocessing and predictions. Min 25 records
    if df.shape[0] < 26:
        
        #If data set has less than 25 records, append the current record to the db table
        df.to_sql('streaming_data', con = engine, if_exists = 'append')
        
    elif df.shape[0] >= 25 and int(df["CONS_END"].iloc[-1]) != 0:
        
        streaming_func(df, engine = db_engine)
        
def on_prices_update(item_update):
   
    #Create a database to hold historical streaming data
    #Convert received data set from lightstream from json to  dataframe for processing
    df = json_normalize(item_update['values'])
    
    #Create datetime column and index current datetime the update was made
    df['DateTime'] = dt.datetime.now().strftime("%d/%m/%Y, %H:%M:%S")
    df.set_index("DateTime", inplace = True)#Setting time column as the index
    df = df.rename(columns = {'OFR_OPEN':'Open', 'OFR_HIGH':'High', 'OFR_LOW':'Low', 'OFR_CLOSE':'Close'})

    #Ensure all values received are numerical for calculations
    df[['Open', 'High', 'Low', 'Close']] = df[['Open', 'High', 'Low', 'Close']].apply(pd.to_numeric)
    
    #Run the streaming functionto preprocess and predict the streamed data
    db_func(df)
    
def streaming_main():
    
    logging.basicConfig(level=logging.INFO)
    # logging.basicConfig(level=logging.DEBUG)

    ig_service = IGService(config.username, config.password, config.api_key, config.acc_type)

    ig_stream_service = IGStreamService(ig_service)
    ig_session = ig_stream_service.create_session()
    # Ensure configured account is selected
    accounts = ig_session[u'accounts']
    for account in accounts:
        if account[u'accountId'] == config.acc_number:
            accountId = account[u'accountId']
            break
        else:
            print('Account not found: {0}'.format(config.acc_number))
            accountId = None
    ig_stream_service.connect(accountId)

    # Making a new Subscription in MERGE
    # https://labs.ig.com/streaming-api-reference
    subscription_prices = Subscription(mode="MERGE", items=['CHART:CC.D.NG.USS.IP:5MINUTE'],
                                       fields=["UTM" , "OFR_OPEN", "OFR_HIGH", "OFR_LOW", "OFR_CLOSE", "CONS_END"],)
    subscription_prices.addlistener(on_prices_update)

    # Registering the Subscription
    sub_key_prices = ig_stream_service.ls_client.subscribe(subscription_prices)

    input("{0:-^80}\n".format("HIT CR TO UNSUBSCRIBE AND DISCONNECT FROM \
    LIGHTSTREAMER"))

    ig_stream_service.disconnect()

In [None]:
#Runs streaming app
streaming_main()

In [None]:
#Back Testing App

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

def back_testing(df):
    
    #Preprocess the data
    df = data_preprocessing(df, df['Close'], df['High'], df['Low'],)
    #Predict the possible buy sell signal
    predicted_buy_or_sell = buy_sell_prediction(df, buy_sell_prediction_model)
    #Predict the possible close price
    predicted_price = price_prediction(df, price_prediction_model) 
    
    #Normalize the length of the dataframe and the predictions
    df_length = predicted_price.shape[0]
    df = df.iloc[-df_length:]
    df['Predicted_Price'] = predicted_price
    df_length = predicted_buy_or_sell.shape[0]
    df = df.iloc[-df_length:]
    df['Buy_or_Sell_Action'] = predicted_buy_or_sell

    #Update the buy sell hold indications from the predictions for graphing purposes
    df.loc[((df['Buy_or_Sell_Action'] == 'Buy')), 'Predicted_Action_Buy'] = 1
    df.loc[((df['Buy_or_Sell_Action'] == 'Sell')), 'Predicted_Action_Sell'] = 1
    df.loc[((df['Buy_or_Sell_Action'] == 'Hold')), 'Predicted_Action_Hold'] = 1
    df['Predicted_Action_Buy'].fillna(0, inplace = True)
    df['Predicted_Action_Sell'].fillna(0, inplace = True)
    df['Predicted_Action_Hold'].fillna(0, inplace = True)
    
    #Redefine the dataframe bases on the needed columns
    #STOCH, RSI, MACD not currently being used for visualization but can be if need be in the future
    df = df[['Close', 'Predicted_Price', 'Predicted_Action_Buy', 'Predicted_Action_Sell', 
             'Predicted_Action_Hold', '%K', '%D', 'RSI', 'MACD', 'MACDS', 'MACDH']]
    
    return df
    

def back_testing_main():
    logging.basicConfig(level=logging.DEBUG)

    expire_after = timedelta(hours=1)
    session = requests_cache.CachedSession(
        cache_name='cache', backend='sqlite', expire_after=expire_after
    )
    # set expire_after=None if you don't want cache expiration
    # set expire_after=0 if you don't want to cache queries

    #config = IGServiceConfig()

    # no cache
    ig_service = IGService(config.username, config.password, config.api_key, config.acc_type)

    # if you want to globally cache queries
    #ig_service = IGService(config.username, config.password, config.api_key, config.acc_type, session)

    ig_service.create_session()

    #epic = 'CS.D.EURUSD.MINI.IP'
    epic = 'CC.D.NG.USS.IP'  # US (SPY) - mini

    #resolution = 'D'
    # see from pandas.tseries.frequencies import to_offset
    #resolution = 'H'
    resolution = '5Min'
    (start_date, end_date) = ('2020-01-23', '2020-01-25')

    num_points = 500 #Number of data points should be at least 50 to accommodate technical indications calculations
    response = ig_service.fetch_historical_prices_by_epic_and_num_points(epic, resolution, num_points)
    
    #response = ig_service.fetch_historical_prices_by_epic_and_date_range(epic, resolution, start_date, end_date)

    # if you want to cache this query
    #response = ig_service.fetch_historical_prices_by_epic_and_date_range(epic, resolution, start_date, end_date, session)
   
    #response = ig_service.fetch_historical_prices_by_epic_and_num_points(epic, resolution, num_points, session)
    
    df_ask = response['prices']['ask']
    
    df_ask = back_testing(df_ask)#Analyse the received data set
    
    #Graphing the historical close price trends, predicted prices and buy sell points
    fig, ax = plt.subplots()

    plt.gcf().set_size_inches(22, 15, forward=True)
    plt.gcf().set_facecolor('xkcd:white')

    #Visualize the buy-sell indication to identify buy sell regions in a graph based on the predictions
    ax = df_ask['Predicted_Action_Buy'].plot(alpha = 0.3, kind = 'bar', color = 'green', label = 'Buy')
    ax = df_ask['Predicted_Action_Sell'].plot(alpha = 0.3, kind = 'bar', color = 'red', label = 'Sell')
    ax = df_ask['Predicted_Action_Hold'].plot(alpha = 0.3, kind = 'bar', color = 'white', label = 'Hold')

    ax.legend(loc = 2)#Position the Buy-Sell-Hold ledgend to the upper left corner	

    ax2 = ax.twinx()#Create another chart on the same figure
    #Visualize the close and predicted price line charts on the same axis
    ax2.plot(ax.get_xticks(), df_ask['Close'], alpha = 0.85, color = 'orange', label = 'Actual Price')
    ax2.plot(ax.get_xticks(), df_ask['Predicted_Price'], alpha = 0.85, color = 'blue', label = 'Predicted Price')


    ax2.legend(loc = 1)#Position the Buy-Sell-Hold ledgend to the upper right corner
    plt.title('Historical Price Movement')# Setting chart title
    #Setting axis names
    plt.xlabel('DateTime')
    plt.ylabel('Price')

    plt.show()#Display the graph

In [None]:
#Runs backtesting app
back_testing_main()