# IBKR Positions to Database - Streamlined
Minimal code version focused on efficient database writes

In [1]:
import time
import threading
import pandas as pd
import random
import datetime
import psycopg2
import os
import sys
import json
from decimal import Decimal
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
def safe_float_convert(value):
    """Safely convert any value to float, handling Decimal types"""
    if value is None or pd.isna(value):
        return None
    if isinstance(value, str):
        try:
            return float(value)
        except ValueError:
            return None
    if isinstance(value, Decimal):
        return float(value)
    try:
        return float(value)
    except (ValueError, TypeError):
        return None

: 

: 

In [None]:
class IBKRPositionApp(EWrapper, EClient):
    def __init__(self):
        EClient.__init__(self, self)
        self.positions = {}
        self.market_data = {}
        self.req_id = 1000
        self.position_data_received = False
        self.market_data_requests = {}
        self.account_updates = {}
        self.account_update_complete = False
        
    def connectTWS(self, port=7497):
        """Connect to TWS or IB Gateway"""
        try:
            client_id = random.randint(1, 999)
            import subprocess
            try:
                result = subprocess.run(['ip', 'route', 'show'], capture_output=True, text=True)
                host_ip = None
                for line in result.stdout.split('\n'):
                    if 'default via' in line:
                        host_ip = line.split()[2]
                        break
            except:
                host_ip = '127.0.0.1'
            
            self.connect(host_ip, port, client_id)
            thread = threading.Thread(target=self.run)
            thread.daemon = True
            thread.start()
            time.sleep(2)
            
            if self.isConnected():
                logger.info(f"Connected to TWS/Gateway on {host_ip}:{port}")
                return True
            return False
        except Exception as e:
            logger.error(f"Connection error: {e}")
            return False
    
    def position(self, account, contract, position, avgCost):
        """Callback for position data"""
        key = f"{contract.symbol}_{contract.secType}_{contract.strike}_{contract.right}_{contract.lastTradeDateOrContractMonth}"
        
        avg_cost_raw = safe_float_convert(avgCost)
        position_float = safe_float_convert(position)
        multiplier = 100.0 if contract.secType == 'OPT' else 1.0
        avg_cost_per_unit = (avg_cost_raw / multiplier) if avg_cost_raw is not None else None

        self.positions[key] = {
            'Account': account,
            'Symbol': contract.symbol,
            'SecType': contract.secType,
            'Description': f"{contract.symbol} {contract.lastTradeDateOrContractMonth} {contract.strike} {contract.right}",
            'AvgCost': avg_cost_per_unit,
            'Strike': safe_float_convert(contract.strike) if hasattr(contract, 'strike') else None,
            'Right': contract.right if hasattr(contract, 'right') else None,
            'Expiry': contract.lastTradeDateOrContractMonth,
            'Position': position_float,
            'Contract': contract,
            'ConId': contract.conId,
            'CurrentPrice': None,
            'MarketVal': None,
            'UnrealizedPnL': None
        }
    
    def positionEnd(self):
        """Callback when all positions have been received"""
        self.position_data_received = True
        logger.info("All position data received")
    
    def updateAccountValue(self, key, val, currency, accountName):
        """Callback for account value updates"""
        # Store account level data if needed
        pass
    
    def updatePortfolio(self, contract, position, marketPrice, marketValue, averageCost, 
                       unrealizedPNL, realizedPNL, accountName):
        """Callback for portfolio updates with market values"""
        key = f"{contract.symbol}_{contract.secType}_{contract.strike}_{contract.right}_{contract.lastTradeDateOrContractMonth}"
        
        print(f"Portfolio update for {key}:")
        print(f"  Position: {position}")
        print(f"  Market Price: {marketPrice}")
        print(f"  Market Value: {marketValue}")
        print(f"  Average Cost: {averageCost}")
        print(f"  Unrealized PnL: {unrealizedPNL}")
        
        # Store the portfolio update data
        self.account_updates[key] = {
            'MarketPrice': safe_float_convert(marketPrice),
            'MarketValue': safe_float_convert(marketValue),
            'UnrealizedPNL': safe_float_convert(unrealizedPNL),
            'AverageCost': safe_float_convert(averageCost),
            'Position': safe_float_convert(position)
        }
    
    def accountDownloadEnd(self, accountName):
        """Callback when account download is complete"""
        self.account_update_complete = True
        print(f"Account download complete for {accountName}")
    
    def error(self, reqId, errorCode, errorString):
        """Callback for error messages"""
        # Only print non-informational errors
        if errorCode not in [2104, 2106, 2158]:
            print(f"ERROR: reqId={reqId}, code={errorCode}, msg={errorString}")
    
    def get_positions_data(self):
        """Request positions and wait for data"""
        self.reqPositions()
        timeout = 30
        start_time = time.time()
        
        while not self.position_data_received and (time.time() - start_time) < timeout:
            time.sleep(0.1)
        
        return len(self.positions)
    
    def get_account_updates(self, account_id):
        """Request account updates to get market values"""
        print(f"\n=== Requesting account updates for {account_id} ===")
        self.reqAccountUpdates(True, account_id)
        
        # Wait for account updates
        timeout = 10
        start_time = time.time()
        while not self.account_update_complete and (time.time() - start_time) < timeout:
            time.sleep(0.1)
        
        # Stop account updates
        self.reqAccountUpdates(False, account_id)
        
        # Update positions with account data
        print(f"\n=== Updating positions with account data ===")
        for key, pos in self.positions.items():
            if key in self.account_updates:
                acc_data = self.account_updates[key]
                market_price = acc_data.get('MarketPrice')
                market_value = acc_data.get('MarketValue')
                unrealized_pnl = acc_data.get('UnrealizedPNL')
                
                # For options, market price needs to be per contract (not per share)
                if pos['SecType'] == 'OPT' and market_price is not None:
                    # IBKR gives market price per share, convert to per contract
                    self.positions[key]['CurrentPrice'] = market_price
                    self.positions[key]['MarketVal'] = market_value
                    self.positions[key]['UnrealizedPnL'] = unrealized_pnl
                    print(f"Updated {key}: Price={market_price}, MktVal={market_value}, PnL={unrealized_pnl}")
                else:
                    self.positions[key]['CurrentPrice'] = market_price
                    self.positions[key]['MarketVal'] = market_value
                    self.positions[key]['UnrealizedPnL'] = unrealized_pnl
                    print(f"Updated {key}: Price={market_price}, MktVal={market_value}, PnL={unrealized_pnl}")
            else:
                print(f"No account update for {key}")
    
    def find_vertical_spreads(self, df):
        """Identify vertical spreads from options positions"""
        options_df = df[df['SecType'] == 'OPT'].copy()
        if options_df.empty:
            return pd.DataFrame()
        
        spreads = []
        grouped = options_df.groupby(['Symbol', 'Right', 'Expiry'])
        
        for (symbol, right, expiry), group in grouped:
            if len(group) >= 2:
                for i in range(len(group)):
                    for j in range(i + 1, len(group)):
                        pos1, pos2 = group.iloc[i], group.iloc[j]
                        
                        strike1 = safe_float_convert(pos1['Strike'])
                        strike2 = safe_float_convert(pos2['Strike'])
                        position1 = safe_float_convert(pos1['Position'])
                        position2 = safe_float_convert(pos2['Position'])
                        
                        if all(v is not None for v in [strike1, strike2, position1, position2]):
                            if (position1 > 0 and position2 < 0) or (position1 < 0 and position2 > 0):
                                if abs(position1) == abs(position2):
                                    spread_type = "Bull" if (strike1 < strike2 and position1 > 0) else "Bear"
                                    spread_type += " Call" if right == "C" else " Put"
                                    
                                    avg_cost1 = safe_float_convert(pos1['AvgCost'])
                                    avg_cost2 = safe_float_convert(pos2['AvgCost'])
                                    net_cost = (avg_cost1 * position1 + avg_cost2 * position2) / abs(position1)
                                    
                                    current1 = safe_float_convert(pos1['CurrentPrice'])
                                    current2 = safe_float_convert(pos2['CurrentPrice'])
                                    current_value = None
                                    market_val = None
                                    unrealized_pnl = None
                                    
                                    if current1 is not None and current2 is not None and current1 > 0 and current2 > 0:
                                        current_value = (current1 * position1 + current2 * position2) / abs(position1)
                                        market_val = current_value * abs(position1) * 100
                                        if net_cost is not None:
                                            unrealized_pnl = (current_value - net_cost) * abs(position1) * 100
                                    
                                    spreads.append({
                                        'Symbol': symbol,
                                        'Description': f"{spread_type} {strike1}/{strike2} {expiry}",
                                        'AvgCost': net_cost,
                                        'CurrentPrice': current_value,
                                        'Position': abs(position1),
                                        'MarketVal': market_val,
                                        'UnrealizedPnL': unrealized_pnl
                                    })
        
        return pd.DataFrame(spreads)
    
    def get_positions_dataframe(self):
        """Convert positions to DataFrame"""
        data = []
        for key, pos in self.positions.items():
            data.append({
                'Symbol': pos['Symbol'],
                'SecType': pos['SecType'],
                'Description': pos['Description'],
                'AvgCost': pos['AvgCost'],
                'CurrentPrice': pos['CurrentPrice'],
                'Position': pos['Position'],
                'MarketVal': pos['MarketVal'],
                'UnrealizedPnL': pos['UnrealizedPnL'],
                'Strike': pos['Strike'],
                'Right': pos['Right'],
                'Expiry': pos['Expiry']
            })
        return pd.DataFrame(data)
    
    def disconnect_tws(self):
        """Disconnect from TWS"""
        if self.isConnected():
            self.disconnect()

: 

: 

In [None]:
project_root = os.getcwd()
config_path = os.path.join(project_root, 'config')
sys.path.insert(0, config_path)

credentials_file = os.path.join(config_path, 'credentials.json')
with open(credentials_file, 'r') as f:
    creds = json.load(f)

pg_creds = creds['database']['postgresql']
pg_creds['host'] = '127.0.0.1'
pg_creds['port'] = 5433

: 

: 

: 

In [None]:
def get_option_strategies():
    """Get option strategies from the database"""
    try:
        conn = psycopg2.connect(
            host=pg_creds['host'],
            port=pg_creds['port'],
            database=pg_creds['database'],
            user=pg_creds['user'],
            password=pg_creds['password']
        )
        
        query = """
        SELECT id, strategy_type, ticker, trigger_price, strike_buy, strike_sell, 
               estimated_premium, options_expiry_date, scrape_date, strategy_status, trade_id
        FROM option_strategies
        WHERE strategy_status IN ('order placed', 'active')
        ORDER BY scrape_date DESC
        """
        
        db_strategies_df = pd.read_sql_query(query, conn)
        conn.close()
        return db_strategies_df
        
    except Exception as e:
        print(f"Database query failed: {e}")
        return pd.DataFrame()

: 

: 

: 

In [None]:
def join_spreads_with_database(spreads_df, db_strategies_df):
    """Join IBKR spreads with database strategies"""
    if spreads_df.empty or db_strategies_df.empty:
        return pd.DataFrame()
    
    joined_data = []
    
    for _, ibkr_row in spreads_df.iterrows():
        description = ibkr_row['Description']
        symbol = ibkr_row['Symbol']
        
        parts = description.split()
        strategy_type = ' '.join(parts[:2])
        strike_info = parts[2]
        expiry = parts[3]
        
        strikes = strike_info.split('/')
        strike1, strike2 = float(strikes[0]), float(strikes[1])
        
        if "Bull" in strategy_type:
            db_strike_buy = min(strike1, strike2)
            db_strike_sell = max(strike1, strike2)
        else:
            db_strike_buy = max(strike1, strike2)
            db_strike_sell = min(strike1, strike2)
        
        expiry_date = f"{expiry[:4]}-{expiry[4:6]}-{expiry[6:]}"
        
        matches = db_strategies_df[
            (db_strategies_df['ticker'] == symbol) &
            (db_strategies_df['strategy_type'] == strategy_type) &
            (db_strategies_df['strike_buy'] == db_strike_buy) &
            (db_strategies_df['strike_sell'] == db_strike_sell) &
            (db_strategies_df['options_expiry_date'].astype(str) == expiry_date)
        ]
        
        for _, db_row in matches.iterrows():
            joined_data.append({
                'ibkr_symbol': symbol,
                'ibkr_description': description,
                'ibkr_avg_cost': ibkr_row['AvgCost'],
                'ibkr_current_price': ibkr_row['CurrentPrice'],
                'ibkr_unrealized_pnl': ibkr_row['UnrealizedPnL'],
                'ibkr_market_val': ibkr_row['MarketVal'],
                'ibkr_position': ibkr_row['Position'],
                'db_id': db_row['id'],
                'db_ticker': db_row['ticker'],
                'db_strategy_type': db_row['strategy_type'],
                'db_estimated_premium': db_row['estimated_premium'],
                'db_trade_id': db_row['trade_id'],
                'premium_difference': ibkr_row['AvgCost'] - db_row['estimated_premium']
            })
    
    return pd.DataFrame(joined_data)

: 

: 

: 

In [None]:
def insert_positions_to_database(joined_df):
    """Insert joined position data into ibkr_positions table"""
    if joined_df.empty:
        return False
    
    try:
        conn = psycopg2.connect(
            host=pg_creds['host'],
            port=pg_creds['port'],
            database=pg_creds['database'],
            user=pg_creds['user'],
            password=pg_creds['password']
        )
        
        cursor = conn.cursor()
        
        for _, row in joined_df.iterrows():
            insert_sql = """
            INSERT INTO ibkr_positions (
                ibkr_symbol, ibkr_description, ibkr_avg_cost, ibkr_current_price,
                ibkr_unrealized_pnl, ibkr_market_val, ibkr_position,
                db_id, db_ticker, db_strategy_type, db_estimated_premium,
                db_trade_id, premium_difference
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON CONFLICT (ibkr_symbol, ibkr_description, db_id)
            DO UPDATE SET
                ibkr_current_price = EXCLUDED.ibkr_current_price,
                ibkr_unrealized_pnl = EXCLUDED.ibkr_unrealized_pnl,
                ibkr_market_val = EXCLUDED.ibkr_market_val,
                updated_at = CURRENT_TIMESTAMP
            """
            
            cursor.execute(insert_sql, (
                row['ibkr_symbol'],
                row['ibkr_description'],
                row['ibkr_avg_cost'],
                row['ibkr_current_price'],
                row['ibkr_unrealized_pnl'],
                row['ibkr_market_val'],
                row['ibkr_position'],
                row['db_id'],
                row['db_ticker'],
                row['db_strategy_type'],
                row['db_estimated_premium'],
                row['db_trade_id'],
                row['premium_difference']
            ))
        
        conn.commit()
        cursor.close()
        conn.close()
        return True
        
    except Exception as e:
        print(f"Database insert failed: {e}")
        return False

: 

: 

: 

In [None]:
app = IBKRPositionApp()
connected = app.connectTWS()

if connected:
    # Get positions
    app.get_positions_data()
    
    # Get account updates which includes market values and prices
    account_id = "DU9233079"  # Your paper trading account
    app.get_account_updates(account_id)
    
    # Get dataframes
    positions_df = app.get_positions_dataframe()
    spreads_df = app.find_vertical_spreads(positions_df)
    
    # Get database strategies and join
    db_strategies_df = get_option_strategies()
    
    if not spreads_df.empty and not db_strategies_df.empty:
        joined_df = join_spreads_with_database(spreads_df, db_strategies_df)
        if not joined_df.empty:
            success = insert_positions_to_database(joined_df)
            print(f"Inserted {len(joined_df)} positions to database")
    
    app.disconnect_tws()
else:
    print("Failed to connect to TWS/Gateway")

INFO:ibapi.client:sent startApi
INFO:ibapi.client:REQUEST startApi {}
INFO:ibapi.client:SENDING startApi b'\x00\x00\x00\n71\x002\x00180\x00\x00'
INFO:ibapi.wrapper:ANSWER connectAck {}
INFO:ibapi.wrapper:ANSWER managedAccounts {'accountsList': 'DU9233079'}
INFO:ibapi.wrapper:ANSWER nextValidId {'orderId': 1}
INFO:__main__:Connected to TWS/Gateway on 172.21.240.1:7497
INFO:ibapi.client:REQUEST reqPositions {}
INFO:ibapi.client:SENDING reqPositions b'\x00\x00\x00\x0561\x001\x00'
INFO:__main__:All position data received
INFO:ibapi.client:REQUEST reqAccountUpdates {'subscribe': True, 'acctCode': 'DU9233079'}
INFO:ibapi.client:SENDING reqAccountUpdates b'\x00\x00\x00\x106\x002\x001\x00DU9233079\x00'
INFO:ibapi.wrapper:ANSWER updateAccountTime {'timeStamp': '07:31'}
INFO:ibapi.wrapper:ANSWER updateAccountTime {'timeStamp': '07:31'}
INFO:ibapi.wrapper:ANSWER updateAccountTime {'timeStamp': '07:31'}
INFO:ibapi.wrapper:ANSWER updateAccountTime {'timeStamp': '07:31'}
INFO:ibapi.wrapper:ANSWER up


=== Requesting account updates for DU9233079 ===
Portfolio update for AAPL_OPT_230.0_P_20251031:
  Position: 1.0
  Market Price: 0.86266525
  Market Value: 86.27
  Average Cost: 204.6259
  Unrealized PnL: -118.36
Portfolio update for AAPL_OPT_240.0_P_20251031:
  Position: -1.0
  Market Price: 2.2638068
  Market Value: -226.38
  Average Cost: 418.3713
  Unrealized PnL: 191.99
Portfolio update for AMZN_OPT_195.0_P_20251031:
  Position: 1.0
  Market Price: 2.5087402
  Market Value: 250.87
  Average Cost: 355.7959
  Unrealized PnL: -104.92
Portfolio update for AMZN_OPT_205.0_P_20251031:
  Position: -1.0
  Market Price: 5.1969056
  Market Value: -519.69
  Average Cost: 619.2013
  Unrealized PnL: 99.51
Portfolio update for IWM_OPT_260.0_C_20251031:
  Position: -1.0
  Market Price: 0.3201032
  Market Value: -32.01
  Average Cost: 204.3713
  Unrealized PnL: 172.36
Portfolio update for IWM_OPT_270.0_C_20251031:
  Position: 1.0
  Market Price: 0.06024335
  Market Value: 6.02
  Average Cost: 45.

: 

: 

: 

In [None]:
print("Positions DataFrame:")
positions_df


Positions DataFrame:


Unnamed: 0,Symbol,SecType,Description,AvgCost,CurrentPrice,Position,MarketVal,UnrealizedPnL,Strike,Right,Expiry
0,AAPL,OPT,AAPL 20251031 230.0 P,2.046259,0.862665,1.0,86.27,-118.36,230.0,P,20251031.0
1,AAPL,OPT,AAPL 20251031 240.0 P,4.183713,2.263807,-1.0,-226.38,191.99,240.0,P,20251031.0
2,UNH,OPT,UNH 20251024 390.0 C,5.392013,0.242427,-1.0,-24.24,514.96,390.0,C,20251024.0
3,NKE,OPT,NKE 20251024 60.0 P,0.110359,0.020398,1.0,2.04,-9.0,60.0,P,20251024.0
4,IWM,OPT,IWM 20251031 260.0 C,2.043713,0.320103,-1.0,-32.01,172.36,260.0,C,20251031.0
5,QQQ,OPT,QQQ 20251031 566.0 P,5.269488,2.458576,-1.0,-245.86,281.09,566.0,P,20251031.0
6,NKE,OPT,NKE 20251031 55.0 P,0.046259,0.00997,1.0,1.0,-3.63,55.0,P,20251031.0
7,SPY,OPT,SPY 20251107 620.0 P,4.266259,2.296564,1.0,229.66,-196.97,620.0,P,20251107.0
8,JNJ,OPT,JNJ 20251017 190.0 C,0.0,3.220001,0.0,0.0,0.0,190.0,C,20251017.0
9,QQQ,OPT,QQQ 20251031 556.0 P,3.890484,1.745876,1.0,174.59,-214.46,556.0,P,20251031.0


: 

: 

: 

In [None]:
print("\nSpreads DataFrame:")
spreads_df


Spreads DataFrame:


Unnamed: 0,Symbol,Description,AvgCost,CurrentPrice,Position,MarketVal,UnrealizedPnL
0,AAPL,Bull Put 230.0/240.0 20251031,-2.137454,-1.401142,1.0,-140.114155,73.631245
1,AMZN,Bull Put 195.0/205.0 20251031,-2.634054,-2.688165,1.0,-268.81654,-5.41114
2,IWM,Bear Call 260.0/270.0 20251031,-1.587454,-0.25986,1.0,-25.985985,132.759415
3,META,Bear Put 640.0/630.0 20251024,-1.329254,-0.147226,1.0,-14.722585,118.202815
4,META,Bear Put 640.0/645.0 20251024,-1.481606,0.099989,1.0,9.99893,158.15953
5,META,Bull Put 630.0/655.0 20251024,-1.031607,-0.50953,1.0,-50.95296,52.20774
6,META,Bull Put 645.0/655.0 20251024,-1.183959,-0.262314,1.0,-26.231445,92.164455
7,NKE,Bull Put 60.0/68.0 20251024,-0.909254,-1.421378,1.0,-142.13782,-51.21242
8,NKE,Bull Put 55.0/67.0 20251031,-1.237454,-1.514952,1.0,-151.495165,-27.749765
9,QQQ,Bear Put 566.0/556.0 20251031,-1.379004,-0.7127,1.0,-71.27005,66.63035


: 

: 

: 

In [None]:
db_strategies_df

: 

: 

: 

: 

: 

: 