In [None]:
import websocket
import json
import logging
import pykx as kx

In [None]:
# Logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:

# KDB+ Tables and Functions (Constants)
TRADE_TABLE = 'trade'
QUOTE_TABLE = 'quote'
DEPTH_TABLE = 'depth'
CANDLE_TABLE = 'candlestick'
QFUNC = '.u.upd' # Tikcer function - Insert data to table

In [None]:

# WebSocket endpoint
BINANCE_WS_URL = "wss://stream.binance.com:9443/ws"

In [None]:

# ======= Configuration =======
MAX_MESSAGES = None  # Set to None for run_forever mode
message_count = 0  # Counter for received messages

In [None]:

# ======= Data Processing and Insertion =======

def insert_to_kdb(table_name, row):
    """Insert a row into the specified KDB+ table."""
    try:
        with kx.QConnection(host='localhost', port=5010) as qconn:
            qconn(QFUNC, table_name, row)
            logger.info(f"Inserted into {table_name}: {row}")
    except Exception as e:
        logger.error(f"Failed to insert into {table_name}: {e}")


# ======= WebSocket Callbacks =======

def on_message(ws, message):
    """Handle incoming WebSocket messages."""
    global message_count

    data = json.loads(message)

    # Trade Data
    if data.get('e') == 'trade':
        trade_time = kx.q(".z.n")
        sym = f"`{data['s']}"  # Symbol - sym
        price = float(data['p'])  # Price
        qty = float(data['q'])  # Quantity
        side = '`BUY' if data['m'] else '`SELL'  

        # Create a table row
        trade_row = kx.q(f"({trade_time}; {sym}; {price}; {qty}; {side})")
        insert_to_kdb(TRADE_TABLE, trade_row)

    # Quote Data 
    elif data.get('e') == 'bookTicker':
        quote_time = kx.q(".z.n")
        sym = f"`{data['s']}"  # Symbol
        bid = float(data['b'])  # Best bid price
        ask = float(data['a'])  # Best ask price
        bid_qty = float(data['B'])  # Bid quantity
        ask_qty = float(data['A'])  # Ask quantity

        # Create a table row
        quote_row = kx.q(f"({quote_time}; {sym}; {bid}; {ask}; {bid_qty}; {ask_qty})")
        insert_to_kdb(QUOTE_TABLE, quote_row)

    # Depth Data 
    elif data.get('e') == 'depthUpdate':
        depth_time = kx.q(".z.n")
        sym = f"`{data['s']}"  # Symbol

        # Process bids
        for bid in data['b']:
            price, qty = float(bid[0]), float(bid[1])
            depth_row = kx.q(f"({depth_time}; {sym}; `BID; {price}; {qty})")
            insert_to_kdb(DEPTH_TABLE, depth_row)

        # Process asks
        for ask in data['a']:
            price, qty = float(ask[0]), float(ask[1])
            depth_row = kx.q(f"({depth_time}; {sym}; `ASK; {price}; {qty})")
            insert_to_kdb(DEPTH_TABLE, depth_row)

    # Candlestick Data
    elif data.get('e') == 'kline':
        kline_time = kx.q(".z.n")
        sym = f"`{data['s']}"  # Symbol
        interval = f"`{data['k']['i']}"  # Interval
        open_price = float(data['k']['o'])
        high_price = float(data['k']['h'])
        low_price = float(data['k']['l'])
        close_price = float(data['k']['c'])
        volume = float(data['k']['v'])

        # Create a table row
        candle_row = kx.q(f"({kline_time}; {sym}; {interval}; {open_price}; {high_price}; {low_price}; {close_price}; {volume})")
        insert_to_kdb(CANDLE_TABLE, candle_row)

    # Increment the message count
    message_count += 1

    # Check if we need to stop after a specific number of messages
    if MAX_MESSAGES and message_count >= MAX_MESSAGES:
        logger.info(f"Reached {MAX_MESSAGES} messages. Closing WebSocket.")
        ws.close()


def on_open(ws):
    """Subscribe to Binance WebSocket streams."""
    logger.info("WebSocket connection established. Subscribing to streams...")

    # Subscribe to multiple streams
    subscription_message = {
        "method": "SUBSCRIBE",
        "params": [
            "btcusdt@trade",  # Trade 
            "btcusdt@bookTicker",  # Quote 
            "btcusdt@depth",  # Depth 
            "btcusdt@kline_1m"  # 1-minute candlesticks
        ],
        "id": 1
    }
    ws.send(json.dumps(subscription_message))
    logger.info("Subscribed to BTCUSDT streams.")


def on_close(ws, close_status_code, close_msg):
    """Handle WebSocket closure."""
    logger.info(f"WebSocket closed: {close_status_code} - {close_msg}")


# ======= Main Function =======

# change the max_messages as per your need
def start_websocket(max_messages=None):
    """Start the WebSocket connection."""
    global MAX_MESSAGES, message_count
    MAX_MESSAGES = max_messages  # Set max messages dynaically
    message_count = 0  # Reseting the counter

    ws = websocket.WebSocketApp(
        BINANCE_WS_URL,
        on_open=on_open,
        on_message=on_message,
        on_close=on_close
    )
    ws.run_forever()


In [None]:

if __name__ == "__main__":
    # start_websocket()  # For run_forever mode 
    start_websocket(max_messages=1000)  # Fetch only 1000 