## Trading BOT with Ale and Interactive Broker

### This document have the goal to develop a python bot, able to interact with a demo account on ibrk.

Motivation to use ibrk:
- safe broker
- many assets
- low fees
- good liquidity and low slippage
- mature and stable API

First of all we need the following ingredients:
- API
- Gateway
- The bot must be able to run on a cloud

[Video](http://youtube.me/tradingbotconale)



### Connection example and fetch of historical data, sync example and async example

In [1]:
# Connection snippet

from ib_insync import *
util.startLoop()  #needed to run ibrk api with jupyter
ib = IB()
ib.connect('127.0.0.1', 7497, clientId=5)

<IB connected to 127.0.0.1:7497 clientId=5>

### Fetch micro nqcontract quotes ticker MNQ

In [None]:
import pandas as pd

mnq_contract = Future('MNQ', '20250321', 'CME', multiplier=2, currency='USD')

def get_pandas_data_from_ibrk(ibrk_bars):
    df = pd.DataFrame([bar.__dict__ for bar in ibrk_bars], columns=['date', 'open', 'high', 'low', 'close', 'volume'])
    df.rename(columns={"date": "datetime"}, inplace=True)
    df.set_index('datetime', inplace=True)
    df.index = pd.to_datetime(df.index)
    return df

def getHistoricalData(contract, candles = 1, tf_min = 10):
  data_type = "MIDPOINT" if contract.symbol == 'EUR' else "TRADES"
  duration_seconds = 60 * candles * tf_min
  # WORKAROUND: if market is open ONLY, we should add delayed data offset otherwise return nodata
  duration_offset_for_delayed_data = 15*60
  duration_seconds += duration_offset_for_delayed_data

  duration = f"{duration_seconds // 84600} D" if duration_seconds > 84600 else f"{duration_seconds} S"
  bars = ib.reqHistoricalData(
      contract,
      endDateTime="",
      durationStr=duration,
      barSizeSetting=f"{tf_min} min" if tf_min == 1 else f"{tf_min} mins",
      whatToShow=data_type,
      useRTH=False
  )

  return get_pandas_data_from_ibrk(bars)

data = getHistoricalData(mnq_contract, 1, 10)
data.tail()


#### [getHistoricalData](https://interactivebrokers.github.io/tws-api/classIBApi_1_1EClient.html#aad87a15294377608e59aec1d87420594) is the fetch API

It's available with demo account, instead realtime APIs are not available.

The last candle is a realtime candle, when is complete, a new candle is added. Therefore the last canlde is always a reatime canlde as the previous is the last closed candle.

This is very importat to implment a data provider to simulate live data.

## [Backtrader](https://www.backtrader.com/docu): Open source project ready to run a strategy on IBRK.
  - Backtrader is old but a stable project that provide a ready API to interacts wit IBRK. The latest available API is not the one provided with the official documentation but is available by an open source project on [git-hub](https://github.com/atreyuxtrading/atreyu-backtrader-api)
  - Backtrader require realtime data but with ibrk demo account is not possible use realtime API. Realtime method give bad response.
  - APIs to fetch historical data are allowed but data are delayed.
  - Delayed data obtained by the APIs are consistent whit the chart and price on the TWS workstation, so it's possible run a strategy in paper-trading.
  - We need to implement a custom data provider that use historical data APIs and is compatible with backtrader interface.
  

### Backtrader RSI long only strategy on micro nasdaq MNQ

In [None]:
# backtrader strategy example: long only, buy when rsi > 60, sell when rsi < 40

import backtrader as bt
import warnings
warnings.filterwarnings('ignore')

class RsiLongStrategy(bt.Strategy):

    def log(self, txt, dt=None):
            dt = dt or self.datas[0].datetime.date(0)
            print('%s, %s' % (dt.isoformat(), txt))

    def __init__(self):
        self.rsi = bt.indicators.RSI_SMA(self.data.close, period=21)

    def next(self):
        if self.rsi > 60 and self.position.size == 0:
            self.log('BUY CREATE, %.2f' % self.data.close[0])
            self.buy()

        elif self.rsi < 40 and self.position.size > 0:
            self.sell()

    def notify(self, order):
            if order.status in [order.Submitted, order.Accepted]:
                return
            if order.status in [order.Completed, order.Canceled, order.Margin]:
                if order.isbuy():
                    self.log(f'BUY EXECUTED, Price: {order.executed.price:.2f} Cost: {order.executed.value:.2f} Comm {order.executed.comm:.2f}')
                else:
                    self.log(f'SELL EXECUTED, Price: {order.executed.price:.2f} Cost: {order.executed.value:.2f} Comm {order.executed.comm:.2f}')


ibrk_data = getHistoricalData(mnq_contract, candles = 3000, tf_min = 10)
data = bt.feeds.PandasData(dataname=ibrk_data)

cerebro = bt.Cerebro()
cerebro.broker.setcash(10000.0)
cerebro.broker.setcommission(commission=0.77, margin=2000.0, mult=2.0)
cerebro.adddata(data)
cerebro.addstrategy(RsiLongStrategy)
result = cerebro.run()
cerebro.plot(iplot=False)

## Extend backtrader datafeed to use ibrk APIs available on demo account
- [backtrader documentation](https://www.backtrader.com/docu/extending-a-datafeed/)
- The implementation use ibrk python api available [here](https://ib-insync.readthedocs.io/)


In [None]:
import threading
import time
import pandas as pd
from datetime import datetime

# Initialize an empty DataFrame
live_data = pd.DataFrame(columns=['open', 'high', 'low', 'close', 'volume'])

# Lock for thread-safe access to live_data
data_lock = threading.Lock()


# Initialize IB client

# Define contracts
dax_contract = Future('DAX', '20250321', 'EUREX', multiplier=1)
# eur_usd_contract = Forex('EURUSD')
# mnq_contract = Future('MNQ', '20250321', 'CME', multiplier=2, currency="USD")  # Update expiry as needed


def fetch_next_bar(contract, data, tf_min = 10):
    ''' return the next finalized candle that we don't have in the global live_data dataframe.
        Because the ibrk api return the last available candle that is the real time candle (also with delayed data)
        We should check that the candle before the last is a new candle, in this case fetch_next_bar return the new confirmed candle.
    '''
    # get last two candles
    last_candles = getHistoricalData(contract, 2, tf_min)

    # Ensure we have the required data
    if len(last_candles) < 2:
        return

    # The second-to-last candle is the most recently confirmed historical candle
    finalized_candle = last_candles.iloc[-2]
    finalized_candle_time = last_candles.index[-2]

    # Check if this finalized candle is new
    if data.empty or finalized_candle_time > data.index[-1]:
        # Return the finalized candle
        return {
            'datetime': finalized_candle_time,
            'open': finalized_candle.open,
            'high': finalized_candle.high,
            'low': finalized_candle.low,
            'close': finalized_candle.close,
            'volume': finalized_candle.volume,
        }

    # No new confirmed candle
    return None


def fetch_live_data(contract, tf_min):
    global live_data
    print(f"fetch timeframe {tf_min}")
    while True:
        try:

            # Fetch the next confirmed candle
            new_bar = fetch_next_bar(contract, live_data, tf_min)

            if new_bar:
                print(f"New bar: {new_bar}")
                new_data = pd.DataFrame([new_bar]).set_index('datetime')
                with data_lock:
                    live_data = pd.concat([live_data, new_data]) if not live_data.empty else new_data

        except Exception as e:
            print(f"Error in fetch_live_data: {e}")

        time.sleep(15)  # Fetch data every minute

# use when market is open for dax_contract or change contract
# in this you see other contract instances
fetch_live_data(dax_contract, 1)

In [9]:
# python data provider using python threading API

import time
import backtrader as bt

class PandasLiveData(bt.feed.DataBase):
    params = (
        ('timeframe', bt.TimeFrame.Minutes),  # Adjust timeframe
        ('compression', 1),
    )

    def __init__(self):
        super().__init__()
        self.current_index = 0
        self.terminate = False  # Flag to indicate when to stop

    def start(self):
        super().start()

    def islive(self):
        return True  # Inform Backtrader this is a live feed

    def _load(self):
        global live_data

        while True:
            with data_lock:
                if self.terminate:
                    return False  # Stop processing if terminate is set

                if self.current_index < len(live_data):
                    row = live_data.iloc[self.current_index]
                    self.current_index += 1
                    break

            time.sleep(0.1)  # Wait briefly before retrying

        if not isinstance(row.name, datetime):
            raise ValueError("The index of the data must be a datetime object.")

        self.lines.datetime[0] = bt.date2num(row.name)
        self.lines.open[0] = row['open']
        self.lines.high[0] = row['high']
        self.lines.low[0] = row['low']
        self.lines.close[0] = row['close']
        self.lines.volume[0] = row['volume']

        return True

In [10]:
# test strategy, just print la bar received by data provider

import backtrader as bt

class TestStrategy(bt.Strategy):
    def __init__(self):
      pass

    def next(self):
        print(f"New bar: {self.datas[0]}")
        # put here logic to trade

In [None]:
global live_data
import asyncio

asyncio.create_task(fetch_live_data(dax_contract, 1))

cerebro = bt.Cerebro()
pandas_live_data = PandasLiveData()
pandas_live_data.data = live_data
cerebro.adddata(pandas_live_data)
cerebro.addstrategy(TestStrategy)
cerebro.run()

# Bot implementation to run the RsiLongStrategy

In [None]:
global live_data
import asyncio

asyncio.create_task(fetch_live_data(mnq_contract, 10))

cerebro = bt.Cerebro()
pandas_live_data = PandasLiveData()
pandas_live_data.data = live_data
cerebro.adddata(pandas_live_data)
cerebro.addstrategy(RsiLongStrategy)
cerebro.run()