In [1]:
"""
concurrency.py

This script demonstrates a rudimentary concurrent trading system using the TWS API.
It combines multiple data streams—account updates, position updates, market scanner subscription,
and real-time market data (tickPrice)—to decide when to enter or exit trades.

The system uses threading to ensure that the API's event loop runs concurrently with our main code.
Throughout the code, extensive comments are provided to explain the logic and how each component works.
The trading strategy used here is improved compared to a basic demo:
    - It monitors account cash and will disconnect if the TotalCashBalance falls below an amount $.
    - It requests position updates to track current holdings, preventing unintended short positions.
    - It uses a market scanner to subscribe to the top MOST_ACTIVE stocks on major U.S. exchanges.
    - For each of the top five scanner results, it subscribes to market data.
    - In the tickPrice callback, it compares the new tick price to the previously stored price.
      • If the price rises more than a % AND no current position exists, it triggers a BUY order.
      • If the price falls more than a % AND a position is currently held, it triggers a SELL order.
    - The strategy uses a fixed order quantity (with a slight variation if adding to an existing position).
    - All these functions operate concurrently via the TWS API's event loop.
"""

from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.tag_value import TagValue
from ibapi.order import Order
from ibapi.contract import Contract
from ibapi.scanner import ScannerSubscription

import datetime
import time
import threading

import logging
# Configure logging to output INFO-level messages on the console
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)

In [2]:
class TestApp(EClient, EWrapper):
    """
    This class subscribes to IB account and portfolio updates.
    
    At the end of the first complete update cycle (signaled via accountDownloadEnd),
    the final StockMarketValue is saved as the shares_init_amount in self.shares_init_amount.
    
    Then, in every subsequent real-time update (in updateAccountValue) when the key
    is "StockMarketValue", the current value is compared against self.shares_init_amount. If the
    StockMarketValue drops by more than the allowed threshold, we disconnect.

    """

    def __init__(self, max_perc_loss):
        # Initialize the EClient with self as the wrapper.
        EClient.__init__(self, self)
        self.max_perc_loss = max_perc_loss  # For example, 0.05 means 5% loss threshold.

        # Account values.
        self.TotalCashBalance = {}          
        self.NetLiquidation = None
        self.StockMarketValue = None

        # Portfolio details.
        self.portfolio = {}
        self.curr_portfolio = None 

        # For storing aggregated values per update cycle (not strictly needed for shares_init_amount check).
        self.total_owned = {}    # Overall account value (NetLiquidation)
        self.total_shares = {}   # StockMarketValue

        # Account time for each update cycle.
        self.account_time = None

        # The dedicated shares_init_amount variable (set once at the end of the first update cycle).
        self.shares_init_amount = None


    def nextValidId(self, orderId: int):
        self.orderId = orderId

    
    def nextId(self) -> int:
        self.orderId += 1
        return self.orderId

    
    def error(self, reqId: int, errorTime: int, errorCode: int, 
              errorString: str, advancedOrderReject=""):
        pass
        # print(f"Error - reqId:{reqId}, errorTime:{errorTime}, errorCode:{errorCode}, "
        #       f"errorString:{errorString}, OrderReject:{advancedOrderReject}")

    
    def updateAccountValue(self, key: str, val: str, currency: str, accountName: str):
        """
        Called repeatedly during the account update cycle.
        This callback simply stores the incoming values.
        Also, for "StockMarketValue" updates that come in after the shares_init_amount has been set,
        a real-time safety check is performed.
        """
        if key == "TotalCashBalance":
            self.TotalCashBalance[currency] = float(val)
        elif key == "NetLiquidation":
            self.NetLiquidation = float(val)
        elif key == "StockMarketValue":
            self.StockMarketValue = float(val)
            
        if (self.shares_init_amount is not None  # wait for the initial amount to be processed in the 'accountDownloadEnd' method 
        and self.portfolio != self.curr_portfolio): # to add the new portfolio value only if it has been updated
            print('[EXEC] updateAccountValue')
            self.total_owned[self.account_time] = self.NetLiquidation
            shares_curr_amount = round(sum(self.portfolio.values()),2)
            self.total_shares[self.account_time] = shares_curr_amount
            print("TotalCashBalance:", self.TotalCashBalance)
            print("Current Portfolio:", self.portfolio)
            print("Total EUR owned amounts:", self.total_owned)
            print("Total USD shares amounts:", self.total_shares)
            self.curr_portfolio = self.portfolio.copy()

            #safety check (disconnect if shares amount drops below a threshold)
            if shares_curr_amount < self.shares_init_amount * (1 - self.max_perc_loss):
                print(f"shares_curr_amount {shares_curr_amount:.2f} "
                      f"is lower than the shares_init_amount {self.shares_init_amount:.2f} "
                      f"(a drop of more than {self.max_perc_loss:.2f}%).")
                print("Disconnecting for safety!")
                self.disconnect()

                
    def updatePortfolio(self, contract: Contract, position: float, marketPrice: float,
                        marketValue: float, averageCost: float, unrealizedPNL: float,
                        realizedPNL: float, accountName: str):
        """
        Called for each portfolio update.
        We store the market value for each stock keyed by the contract symbol.
        """
        self.portfolio[contract.symbol] = marketValue

    
    def updateAccountTime(self, timeStamp: str):
        """
        Saves the account time for the current update cycle.
        """
        self.account_time = timeStamp

    
    def accountDownloadEnd(self, accountName: str):
        """
        Called when the entire batch of account (and portfolio) updates is finished.
        We perform our final processing here.
        """
        print('[EXEC] accountDownloadEnd')
        # Set the shares_init_amount (if set here, and not directly in the 'updateAccountValue' method, it stores the correct result)
        if self.shares_init_amount is None:
            self.shares_init_amount = self.StockMarketValue
            print('shares_init_amount:', self.shares_init_amount)



    # def position(self, account, contract: Contract, position, avgCost):
    #     """
    #     Callback: Receives updates for positions held.
    #     We record the current position for each symbol in our position_ref dictionary,
    #     ensuring that trading logic can check current holdings to avoid short positions.
    #     """
    #     position_ref[contract.symbol] = position
    #     print(f"[Position Update] {contract.symbol}: Position = {position}, AvgCost = {avgCost}")

    # def scannerData(self, reqId, rank, contractDetails, distance, benchmark, projection, legsStr):
    #     """
    #     Callback: Receives results from a market scanner subscription.
    #     We limit our processing to the top 5 results (rank < 5).
    #     For each result:
    #       - Create a unique rank-based request ID.
    #       - Save the contract details in the 'TotalCashBalance' dictionary.
    #       - Initialize the current position for that symbol to zero in 'position_ref'.
    #       - Request real-time market data for the contract.
    #       - Log the symbol and exchange for verification.
    #     """
    #     # Process only top 5 scanner results
    #     if rank < 5:
    #         rankId = rank + reqId  # Create a unique id combining reqId and rank
    #         # Save the contract information in the TotalCashBalance dictionary
    #         TotalCashBalance[rankId] = {"contract": contractDetails.contract}
    #         # Initialize our position for this contract (avoid short positions)
    #         position_ref[contractDetails.contract.symbol] = 0
    #         # Request live market data for this contract (tickPrice callbacks will be received using rankId)
    #         self.reqMktData(rankId, contractDetails.contract, "", False, False, [])
    #         print(f"[Scanner] Rank {rank}: {contractDetails.contract.symbol} on {contractDetails.contract.exchange}")

    # def scannerDataEnd(self, reqId):
    #     """
    #     Callback: Indicates that the market scanner subscription has finished sending data.
    #     Here, we cancel the scanner subscription to avoid duplicate data, but we do not disconnect.
    #     """
    #     print(f"[Scanner] Data end for reqId: {reqId}")
    #     self.cancelScannerSubscription(reqId)

    # def tickPrice(self, reqId, tickType, price, attrib):
    #     """
    #     Callback: Receives real-time market data tick prices.
    #     Implements our improved trading strategy:
    #       - For each tick, if this is the first price received for the instrument,
    #         store it as our reference "LAST" price.
    #       - Otherwise, compare the new price with the stored "LAST" price:
    #          • If price increases by more than 5% compared to the stored price and there is no current position,
    #            place a BUY market order.
    #          • If price decreases by more than 6% compared to the stored price and we have a long position,
    #            place a SELL market order.
    #       - After processing, update the stored "LAST" price to the current price.
    #     """
    #     # We only process tick prices if the tickType is relevant (assume tickType corresponds to LAST price)
    #     # Check if we have stored a "LAST" price for this reqId; if not, initialize it.
    #     if reqId not in TotalCashBalance or "LAST" not in TotalCashBalance[reqId]:
    #         TotalCashBalance.setdefault(reqId, {})["LAST"] = price
    #         # Initial logging for the first tick.
    #         print(f"[Tick Init] reqId: {reqId}, Initial LAST price set to: {price}")
    #         return  # Return early since there's no previous price to compare with

    #     # Retrieve the previously stored price
    #     previous_price = TotalCashBalance[reqId]["LAST"]
    #     # Retrieve the contract details stored earlier for this reqId.
    #     contract = TotalCashBalance[reqId]["contract"]

    #     # Create a new market order object
    #     order = Order()
    #     order.tif = "DAY"           # Time in Force: Order is valid for the day
    #     order.orderType = "MKT"     # Market Order
    #     # Strategy Improvement:
    #     # If we are not holding any position for this symbol, set an initial order quantity.
    #     # If we already have a position, we may choose to add or reduce the holding.
    #     initial_qty = 5  # Default order quantity

    #     # Improved Logic:
    #     # Check if the new price is significantly higher than the previous price (more than 5% gain).
    #     if price > previous_price * 1.05:
    #         # Only consider buying if we are not already in a position or choose to scale in.
    #         if position_ref.get(contract.symbol, 0) == 0:
    #             order.action = "BUY"
    #             order.totalQuantity = initial_qty
    #             print(f"[Signal BUY] {contract.symbol}: Price increased from {previous_price:.2f} to {price:.2f}. No current position; buying {order.totalQuantity} shares.")
    #             self.placeOrder(self.nextId(), contract, order)
    #         else:
    #             # Optionally, add to an existing position if desired. Here, we add a smaller quantity.
    #             order.action = "BUY"
    #             order.totalQuantity = 3
    #             print(f"[Signal BUY Add] {contract.symbol}: Price increased from {previous_price:.2f} to {price:.2f}. Already holding {position_ref[contract.symbol]} shares; adding {order.totalQuantity} shares.")
    #             self.placeOrder(self.nextId(), contract, order)
        
    #     # Check if the new price is significantly lower than the previous price (more than 6% drop)
    #     elif price < previous_price * 0.94:
    #         # Only sell if we have a current long position; avoid going short.
    #         if position_ref.get(contract.symbol, 0) >= initial_qty:
    #             order.action = "SELL"
    #             # Sell a fixed quantity (could be full position if desired)
    #             order.totalQuantity = initial_qty
    #             print(f"[Signal SELL] {contract.symbol}: Price dropped from {previous_price:.2f} to {price:.2f}. Holding {position_ref[contract.symbol]} shares; selling {order.totalQuantity} shares.")
    #             self.placeOrder(self.nextId(), contract, order)
    #         else:
    #             print(f"[No SELL] {contract.symbol}: Price dropped from {previous_price:.2f} to {price:.2f} but insufficient position to sell.")
        
    #     # Update the stored LAST price for future comparisons.
    #     TotalCashBalance[reqId]["LAST"] = price

    # def openOrder(self, orderId, contract: Contract, order, orderState):
    #     """
    #     Callback: Informs us about the status of open orders.
    #     Here, we log any rejected orders with a timestamp for review.
    #     """
    #     if orderState.status == "Rejected":
    #         print(f"[Order Rejected] {datetime.datetime.now()} ID:{orderId} || {order.action} {order.totalQuantity} {contract.symbol}")

    # def execDetails(self, reqId, contract: Contract, execution):
    #     """
    #     Callback: Returns execution details upon order fills.
    #     This example prints a simple summary of the trade execution.
    #     """
    #     print(f"[Execution] Order ID:{execution.orderId} || {execution.side} {execution.shares} {contract.symbol} @ {execution.time}")




In [3]:
port = 7496  # Typical port for connecting to TWS (7496 for IB Gateway live trading)
clientId = 101
max_perc_loss = 5 # max percentage loss allowed (automatically disconnected if exceeded)

# Create an instance of the TestApp and connect to TWS.
app = TestApp(max_perc_loss)
app.connect("localhost", port, clientId) # "localhost" == "127.0.0.1"

# Start the API processing loop in a separate thread so that it does not block the main thread.
#by setting daemon=True, we ensure that the event loop thread running app.run() will not hold up the termination of the overall program. 
# because in TWS API integration, we want the main thread to handle shutdown logic gracefully without waiting for every background thread to finish.
threading.Thread(target=app.run, daemon=True).start()

time.sleep(1)  # Pause briefly to ensure a reliable connection before making requests


2025-05-09 18:27:56,017 [INFO] sent startApi
2025-05-09 18:27:56,019 [INFO] REQUEST startApi {}
2025-05-09 18:27:56,021 [INFO] SENDING startApi b'\x00\x00\x00\x0b\x00\x00\x00G2\x00101\x00\x00'
2025-05-09 18:27:56,021 [INFO] ANSWER connectAck {}
2025-05-09 18:27:56,027 [INFO] ANSWER openOrderEnd {}
2025-05-09 18:27:56,134 [INFO] ANSWER managedAccounts {'accountsList': 'U18112846'}


In [4]:
# # Request position updates so that position_ref is kept up to date with our current holdings.
# app.reqPositions()

# # ------------------------------------------------------------------------------------
# # Set up a market scanner subscription to identify trending opportunities.
# sub = ScannerSubscription()
# sub.instrument = "STK"             # We are interested in stocks.
# sub.locationCode = "STK.US.MAJOR"    # Focus on major US exchanges (NYSE, NASDAQ, AMEX, etc.).
# sub.scanCode = "MOST_ACTIVE"         # Use the "MOST_ACTIVE" scan to get highly traded stocks.

# # There might be additional "scan_options", but here we leave the list empty.
# scan_options = []

# # Define filter options to narrow down the scanner results:
# filter_options = [
#     TagValue("avgVolumeAbove", "1000000"),  # Stocks with an average trading volume above 1,000,000.
#     TagValue("priceAbove", "10")            # Stocks priced above $10 to avoid very low-priced ones.
# ]

# # Request the market scanner subscription using a unique request ID.
# app.reqScannerSubscription(app.nextId(), sub, scan_options, filter_options)

# # Note: The program will continue to run with ongoing account updates, position updates,
# # market scanner data, and tick price processing concurrently.
# # Orders will be placed automatically by the logic in tickPrice based on market conditions.

In [5]:
# Request account updates to maintain a continuous stream of account cash data.
# We use this to check if the TotalCashBalance in our "BASE" currency (e.g., USD) remains sufficient.
# Callback --> 'updateAccountValue', 'updatePortfolio' and 'updateAccountTime'
app.reqAccountUpdates(subscribe=True, # "True": the client will start receiving account and Portfolio updates, "False": it stops doing it
                      acctCode="") # The account code for which to receive account and portfolio updates


2025-05-09 18:27:57,071 [INFO] REQUEST reqAccountUpdates {'subscribe': True, 'acctCode': ''}
2025-05-09 18:27:57,071 [INFO] SENDING reqAccountUpdates b'\x00\x00\x00\t\x00\x00\x00\x062\x001\x00\x00'


[EXEC] accountDownloadEnd
shares_init_amount: 294.67
[EXEC] updateAccountValue
TotalCashBalance: {'BASE': 2741.8672, 'EUR': 2399.2, 'USD': 386.0}
Current Portfolio: {'BBVA': 5.49, 'COOP': 3.01, 'DB': 27.28, 'LIN': 3.39, 'MUFG': 4.13, 'NFLX': 6.96, 'NIO': 5.99, 'NVDA': 116.29, 'SAN': 3.71, 'SE': 9.34, 'WBD': 9.08, 'WMT': 99.99}
Total EUR owned amounts: {'18:26': 3003.61}
Total USD shares amounts: {'18:26': 294.66}
[EXEC] updateAccountValue
TotalCashBalance: {'BASE': 2741.9527, 'EUR': 2399.2, 'USD': 386.0}
Current Portfolio: {'BBVA': 5.48, 'COOP': 3.02, 'DB': 27.28, 'LIN': 3.39, 'MUFG': 4.13, 'NFLX': 6.97, 'NIO': 5.99, 'NVDA': 116.29, 'SAN': 3.71, 'SE': 9.34, 'WBD': 9.08, 'WMT': 99.97}
Total EUR owned amounts: {'18:26': 3003.61, '18:29': 3003.74}
Total USD shares amounts: {'18:26': 294.66, '18:29': 294.65}
[EXEC] updateAccountValue
TotalCashBalance: {'BASE': 2741.9527, 'EUR': 2399.2, 'USD': 386.0}
Current Portfolio: {'BBVA': 5.48, 'COOP': 3.02, 'DB': 27.27, 'LIN': 3.39, 'MUFG': 4.13, 'NF