In [1]:
import numpy as np 
import pandas as pd 
import datetime as datetime
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from IPython.display import clear_output,display
from datetime import *
from time import *
from ib_async import *

In [None]:
class LiveMACrossoverBuy:
    
    def __init__(self, symbol, qty, ema_short = 11, ema_long = 22):
        self.symbol = symbol
        self.qty = qty
        self.ema_short = ema_short
        self.ema_long = ema_long
        self.position = 0
        self.cumulative_return = 1.0
        self.trades = []
        self.cumulative_returns_history = [] 
        self.timestamps = []
        self.establish_connection()
        self.get_price_data()
        self.bars.updateEvent.clear()
        self.calc_technical_indicators()
        self.fig = self.plot_graph(self.df)
        display(self.fig)
        self.start_trading()
        self.summary()
        

    def establish_connection(self):
        util.startLoop()
        self.ib = IB()
        try:
            self.ib.connect(port = 7497, clientId = 1)
            print("Successfully connected to IB API")
        except Exception as e:           
            print(f"Error connecting to IB API : {e} ")
            
    def get_price_data(self):
        stock = Stock(symbol = self.symbol, exchange = 'SMART', currency = 'USD')
        qualified_contract = self.ib.qualifyContracts(stock)
        self.contract = qualified_contract[0]
        self.bars = self.ib.reqHistoricalData(contract= self.contract, endDateTime='', durationStr= '1 D', barSizeSetting= '15 secs', whatToShow='TRADES', useRTH=False, keepUpToDate=True)
        self.df = util.df(self.bars)
        print(f"Price data successfully retrieved for {self.contract.symbol}")

    def calc_technical_indicators(self):
        self.df[f'EMA {self.ema_short}'] = self.df.close.ewm(span = self.ema_short, adjust=False).mean()
        self.df[f'EMA {self.ema_long}'] = self.df.close.ewm(span = self.ema_long, adjust=False).mean()
        self.df.dropna(inplace=True)

    def generate_signals(self):
        Indicator1 = f'EMA {self.ema_short}'
        Indicator2 = f'EMA {self.ema_long}'
    
        # BUY SIGNAL : Buy when Short EMA >= Long EMA + No Position

        BUY_SIGNAL = (self.df[Indicator1] >= self.df[Indicator2]).iloc[-1]*(self.position == 0)

        # SELL SIGNAL : Sell when Short EMA <= LONG EMA + we have a position

        SELL_SIGNAL =(self.df[Indicator1] <= self.df[Indicator2]).iloc[-1]*(self.position>0)
        
        return BUY_SIGNAL, SELL_SIGNAL
    
    def execute_trades(self):
        current_time = self.df['date'].iloc[-1] 
        BUY_SIGNAL, SELL_SIGNAL = self.generate_signals()

        if BUY_SIGNAL:
            trade = self.ib.placeOrder(contract = self.contract, order = MarketOrder(action = 'BUY', totalQuantity= self.qty))
            entry_price = trade.orderStatus.avgFillPrice
            self.trades.append({'type': 'buy', 'price': entry_price, 'qty': self.qty})
            print('=' * 55)
            print(f"Buying {self.qty} unit of {self.contract.symbol} at {entry_price}")
            print('=' * 55)
            self.position = self.qty

        elif SELL_SIGNAL:
            trade = self.ib.placeOrder(contract = self.contract, order = MarketOrder(action = 'SELL', totalQuantity= self.qty))
            exit_price = trade.orderStatus.avgFillPrice
            entry_price = self.trades[-1]['price']
            percentage_return = ((exit_price - entry_price) / entry_price) * 100
            self.cumulative_return *= (1 + (percentage_return / 100))
            profit = (exit_price - self.trades[-1]['price']) * self.position
            self.trades.append({'type': 'sell', 'price': exit_price, 'profit': profit, 'percentage_return': percentage_return})
            self.cumulative_returns_history.append(self.cumulative_return)
            self.timestamps.append(current_time)
            self.position = 0
            print('=' * 55)
            print(f"Closing long position for {self.qty} unit of {self.contract.symbol} at an averge fill price of: {exit_price}, Profit : {profit}")
            print('=' * 55)
            self.position = 0

        elif self.position > 0:
                print('=' * 55)
                print("Open Long position, Hold")
                print('=' * 55)

        else:
            print('=' * 55)
            print("No position held, do nothing")
            print('=' * 55)


    def plot_graph(self, data):
        '''Function to plot a graph'''
        fig = go.FigureWidget()

        fig.add_trace(go.Scatter(
            x = data.date,
            y = data['close'],
            mode = 'lines',
            name = 'Close Price',
            line = dict(color='blue')
        ))

        #   fig.add_trace(go.Candlestick(
        #     x = data.date,
        #     open = data['open'],
        #     high = data['high'],
        #     low = data['low'],
        #     close = data['close'],
        #     name = 'Prices'
        # ))
        
        fig.add_trace(go.Scatter(
            x = data.date,
            y = data[f'EMA {self.ema_short}'],
            mode = 'lines',
            name = f'EMA {self.ema_short}',
            line = dict(color='aqua')
        ))

        fig.add_trace(go.Scatter(
            x = data.date,
            y = data[f'EMA {self.ema_long}'],
            mode = 'lines',
            name = f'EMA {self.ema_long}',
            line = dict(color='red')
        ))

        fig.add_trace(go.Scatter(
            x = data.date.tolist() + data.date.tolist()[::-1],
            y = data[f'EMA {self.ema_long}'].tolist() + data[f'EMA {self.ema_short}'].tolist()[::-1],
            fill = 'toself',
            fillcolor = 'rgba(128, 0, 128, 0.2)',
            line=dict(color='rgba(128, 0, 128, 0)'),
            showlegend=False
        ))

        fig.update_layout(
            title = 'EMA Crossover',
            xaxis_title = 'Index',
            yaxis_title = 'Price',
            legend_title = 'Legend',
            template = 'plotly_dark'
        )
        return fig

    def update_plot(self):
        with self.fig.batch_update():
            self.fig.data[0].y = self.df['close']
            self.fig.data[1].y = self.df[f'EMA {self.ema_short}']
            self.fig.data[2].y = self.df[f'EMA {self.ema_long}']
            self.fig.data[0].x = self.df['date']
            self.fig.data[1].x = self.df['date']
            self.fig.data[2].x = self.df['date']
            self.fig.data[3].x = self.df['date'].tolist() + self.df['date'].tolist()[::-1]
            self.fig.data[3].y = self.df[f'EMA {self.ema_short}'].tolist() + self.df[f'EMA {self.ema_long}'].tolist()[::-1]
            print("Plot data has been updated")

    def onBarUpdate(self, bars, hasNewBar):
        ''' Callback for each bar update to recalculate indicators and execute trades''' 
        self.df = util.df(bars)
        self.calc_technical_indicators()
        clear_output(wait = True)
        display(self.fig)
        self.update_plot()
        display(self.df.tail(2))
        self.execute_trades()

    def start_trading(self):
        '''Start the trading process by subscribing the callback'''
        self.bars.updateEvent += self.onBarUpdate
        self.ib.sleep(10)
        self.bars.updateEvent.clear()
        self.ib.disconnect()
        print("Disconnected from IB API. Trading stopped")

    def calculate_pcreturns(self):
        '''Calculate Percentage Returns for Strategy'''
        final_return_percentage = (self.cumulative_return - 1)*100
        print('=' * 55)
        print(f"Final Cumulative Return: {final_return_percentage:.2f}%")
        print('=' * 55)
        return final_return_percentage
    
    def plot_cumulative_returns(self):
        '''Plot Cumulative returns over time'''
        fig = go.Figure()
        fig.add_trace(go.Scatter(
            x= self.timestamps,
            y= self.cumulative_returns_history,
            mode='lines+markers',
            name='Cumulative Returns',
            line=dict(color='blue'),
            marker=dict(size=6)))
        

    def summary(self):
        '''Summary of trading results and cumulative returns visualised'''
        self.calculate_pcreturns()
        self.plot_cumulative_returns() 

In [None]:
sample = LiveMACrossoverBuy('NVDA', 20)
