In [0]:
!ls hyperliquid/utils/kafka_callbacks.py
!ls hyperliquid/utils/rate_limiter.py
!ls hyperliquid/utils/example_utils.py


In [0]:
!pip install aiokafka
!pip install kafka-python
dbutils.library.restartPython()


In [0]:

import asyncio
from asyncio import run_coroutine_threadsafe
import os
import json
from datetime import datetime
from typing import Any

from hyperliquid.utils import constants
from hyperliquid.utils import rate_limiter
from hyperliquid.utils import example_utils

from kafka import KafkaAdminClient
from kafka.admin import NewTopic
from kafka.errors import TopicAlreadyExistsError

# Import our Kafka callbacks
from hyperliquid.utils.kafka_callbacks import (
    ClickHouseFillKafka,
    ClickHouseOrderKafka,
    ClickHouseLiquidationsKafka,
    ClickHouseLiquidationsDevKafka
)

import yaml
import time
USE_DEV = True
start_time = int(time.time() * 1000)

def load_addresses_from_config(path="config_whales.yml") -> list[str]:
    try:
        with open(path, "r") as f:
            data = yaml.safe_load(f)
            print(f"Loaded addresses from {path}: {data}")
            return data.get("addresses", [])
    except Exception as e:
        print(f"[ERROR] Failed to load addresses from {path}: {e}")
        return []


# Set up your Kafka config
KAFKA_SERVER = "cthki8qfdq8asdnsm9gg.any.us-east-1.mpx.prd.cloud.redpanda.com:9092"
USERNAME = "alexei.jobfinder@gmail.com"
PASSWORD = "y0obC7dFiU3CJxcsCH4RwXtwEhaauf"
username = "alexei.jobfinder@gmail.com"
password = 'y0obC7dFiU3CJxcsCH4RwXtwEhaauf'
bootstrap_servers = 'cthki8qfdq8asdnsm9gg.any.us-east-1.mpx.prd.cloud.redpanda.com:9092'

# Instantiate the fill/order Kafka callbacks
fill_kafka_cb = ClickHouseFillKafka(
    bootstrap=KAFKA_SERVER,
    username=USERNAME,
    password=PASSWORD
)

order_kafka_cb = ClickHouseOrderKafka(
    bootstrap=KAFKA_SERVER,
    username=USERNAME,
    password=PASSWORD
)
if USE_DEV:
    liquidation_kafka_cb = ClickHouseLiquidationsDevKafka(
        bootstrap=KAFKA_SERVER,
        username=USERNAME,
        password=PASSWORD
    )
else:
    liquidation_kafka_cb = ClickHouseLiquidationsKafka(
        bootstrap=KAFKA_SERVER,
        username=USERNAME,
        password=PASSWORD
    )


rate_limiter = rate_limiter.HyperliquidRateLimiter()
event_loop = asyncio.get_event_loop()


def ensure_topics(bootstrap_servers, username, password, sasl_mechanism="SCRAM-SHA-256"):
    admin = KafkaAdminClient(
        bootstrap_servers=bootstrap_servers,
        security_protocol="SASL_SSL",
        sasl_mechanism=sasl_mechanism,
        sasl_plain_username=username,
        sasl_plain_password=password,
    )

    topics_to_create = [
        'orders',
        'fills',
        'liquidations_dev',
    ]

    existing_topics = admin.list_topics()

    new_topics = []
    for t in topics_to_create:
        if t not in existing_topics:
            new_topics.append(NewTopic(name=t, num_partitions=1, replication_factor=3))

    if new_topics:
        try:
            admin.create_topics(new_topics=new_topics)
            print("Created topics:", [nt.name for nt in new_topics])
        except TopicAlreadyExistsError:
            print("Some topics already exist.")
    else:
        print("All topics already exist")

    admin.close()


# async def handle_user_fills(msg, address):
#     # Rate limit for each fill event.
#     await rate_limiter.acquire_address_action(address)

#     fills = msg.get("data", {}).get("fills", [])
#     for f in fills:
#         # Prepare fill data for Kafka
#         fill_data = {
#             "exchange": "hyperliquid",
#             "symbol": f.get("coin"),         # was "coin" in the original fill
#             "side": f.get("side"),
#             "price": f.get("px"),
#             "amount": f.get("sz"),
#             "fee": f.get("fee"),
#             "id": f.get("hash"),
#             "order_id": f.get("oid"),
#             "liquidity": f.get("crossed"),   # optional choice if you'd like to store "crossed" under liquidity
#             "type": f.get("dir"),
#             "account": address,
#             "timestamp": f.get("time"),
#             # If desired, you can provide a receipt_timestamp. Here, we omit it or set to None.
#             "raw": f,                       # store the individual fill as raw
#             "raw_data": msg                 # store the entire message for debugging
#         }
#         # Send fill data to Kafka
#         await fill_kafka_cb.write(fill_data)


# Notation reference for Hyperliquid v0 API (nonstandard; subject to change in v1):
# Px   = Price
# Sz   = Size (in units of coin, i.e., base currency)
# Szi  = Signed size (positive for long, negative for short)
# Ntl  = Notional (USD amount, Px * Sz)
# Side = Side of trade/book: B = Bid = Buy, A = Ask = Short (aggressing side for trades)
# Asset = Integer representing the asset being traded
# Tif  = Time in force: GTC = Good 'til canceled, ALO = Add liquidity only, IOC = Immediate or cancel
async def handle_user_fills(msg, address):
    # Rate limit once for the batch of fills
    await rate_limiter.acquire_address_action(address)
    #     {\"coin\":\"BTC\",\"px\":\"79823.0\",\"sz\":\"0.01126\",\"side\":\"A\",\"time\":1743963674808,\"startPosition\":\"0.01126\",\"dir\":\"Close Long\",\"closedPnl\":\"-6.768386\",\"hash\":\"0x766b22d7a6c15f8f4fef0421080ab00203730023705d3dd217a1dbfe8f96a41e\",\"oid\":84299475887,\"crossed\":true,\"fee\":\"0.224701\",\"tid\":602065776560497,\"liquidation\":{\"liquidatedUser\":\"0xa44481a6454f4fd0899e261aa941323f2b11a09b\",\"markPx\":\"79840.0\",\"method\":\"market\"},\"feeToken\":\"USDC\"},
    fills = msg.get("data", {}).get("fills", [])
    for f in fills:
        try:
            ts = f.get("time")
            # Extract liquidation info
            liquidation = f.get("liquidation", {}) or {}
            is_liquidation = 1 if liquidation else 0
            liquidated_user = liquidation.get("liquidatedUser") if is_liquidation else -1
            liquidation_method = liquidation.get("method") if is_liquidation else None
            liquidation_mark_price = liquidation.get("markPx") if is_liquidation else -1
            side = f.get("side")
            normalized_side = {"B": "buy", "A": "sell"}.get(side, side)  # Normalize side to 'buy'/'sell'
            # create a hash of f
            fill_data = {
                "exchange": "hyperliquid",
                "symbol": f.get("coin"),
                "side": normalized_side,
                "price": f.get("px"),
                "amount": f.get("sz"),
                "fee": f.get("fee"),
                "id": f.get("hash"), # i can dedup by id which is the hash of the fill
                "order_id": f.get("oid"),
                "liquidity": f.get("crossed"),
                "type": f.get("dir"),
                "account": address,
                "timestamp": ts,
                "is_liquidation": is_liquidation,
                "liquidated_user": liquidated_user,
                "liquidation_method": liquidation_method,
                "liquidation_mark_price": liquidation_mark_price,
                # the price that the book reached when the order was sent for liquidation
                # the price is what it actually executed at. 
                # e.g. mark price was 108.69, so book reached that
                # but executed at 108.51
                "raw": f,
                "raw_data": f
            }
            print(f"[DEBUG] Fills data for address {address}: {fill_data}")
            await fill_kafka_cb.write(fill_data)

            if is_liquidation:
                data = {
                    "exchange": "hyperliquid",
                    "symbol": f.get("coin"),
                    "side": normalized_side,
                    "quantity": f.get("sz"),
                    "price": f.get("px"),
                    "id": f.get("hash"), # i can dedup by id which is the hash of the fill
                    "status": f.get("dir"),
                    "timestamp": ts,
                    "is_liquidation": is_liquidation,
                    "liquidated_user": liquidated_user,
                    "liquidation_method": liquidation_method,
                    "liquidation_mark_price": liquidation_mark_price,
                    "raw": f,
                    "raw_data": f,
                }
                await liquidation_kafka_cb.write(data)

        except Exception as e:
            print(f"[WARN] Failed to process fill for address {address}: {e}")

async def handle_order_updates(msg, address):
    # Rate limit for each order update.
    await rate_limiter.acquire_address_action(address)

    for update in msg.get("data", []):
        order = update.get("order", {})
        side = order.get("side")
        normalized_side = {"B": "buy", "A": "sell"}.get(side, side)

        # Compute remaining amount if both origSz and sz are available
        try:
            orig_sz = float(order.get("origSz", "nan"))
            current_sz = float(order.get("sz", "nan"))
            remaining_amount = orig_sz - current_sz
        except Exception:
            remaining_amount = None  # fallback if values are missing or invalid

        order_data = {
            "exchange": "hyperliquid",
            "symbol": order.get("coin"),
            "id": order.get("oid"),
            "client_order_id": order.get("cloid"),
            "side": normalized_side,
            "status": update.get("status"),
            "type": "limit" if order.get("limitPx") else "unknown",
            "price": order.get("limitPx"),
            "amount": order.get("sz"),
            "remaining_amount": str(remaining_amount) if remaining_amount is not None else None,
            "timestamp": order.get("timestamp"),
            "account": address,
            "raw": order,
            "raw_data": msg
        }

        print(f"[DEBUG] order_data for address {address}: {order_data}")
        await order_kafka_cb.write(order_data)


async def backfill_fills(address: str, _start_time: int):
    _, info, _ = example_utils.setup(constants.MAINNET_API_URL)

    # 3-day window (in milliseconds)
    end_time = int(time.time() * 1000)
    start_time = end_time - 7 * 86400 * 1000  # 7 days ago

    seen_fill_ids: set[str] = set()

    try:
        fills = info.user_fills_by_time(address, start_time, end_time)
        for msg in fills:
            fill_id = msg.get("hash")
            if fill_id in seen_fill_ids:
                continue
            seen_fill_ids.add(fill_id)
            await handle_user_fills({"data": {"fills": [msg]}}, address)
            await asyncio.sleep(0.01)  # throttle per fill
    except Exception as e:
        print(f"[ERROR] backfill_fills failed for {address}: {e}")


async def subscribe_to_address(address: str, include_backfill_fills: bool):
    try:
        _, info, _ = example_utils.setup(constants.MAINNET_API_URL)

        await rate_limiter.acquire_address_action(address)

        if include_backfill_fills:
            await backfill_fills(address, start_time)

        info.subscribe({"type": "userFills", "user": address},
                       lambda msg: run_coroutine_threadsafe(handle_user_fills(msg, address), event_loop))
        info.subscribe({"type": "orderUpdates", "user": address},
                       lambda msg: run_coroutine_threadsafe(handle_order_updates(msg, address), event_loop))

        print(f"[{address}] Subscribed. Listening...")

        while True:
            await asyncio.sleep(5)

    except Exception as e:
        print(f"[{address}] ERROR: {e}")
        await asyncio.sleep(3)
        return await subscribe_to_address(address, include_backfill_fills)


    except Exception as e:
        print(f"[{address}] ERROR: {e}")
        await asyncio.sleep(3)
        return await subscribe_to_address(address)  # Auto-retry

async def main():
    INCLUDE_BACKFILL_FILLS = True
    ensure_topics(bootstrap_servers, username, password)

    addresses = load_addresses_from_config()

    BATCH_SIZE = 10
    DELAY_BETWEEN_BATCHES = 15  # seconds

    for i in range(0, len(addresses), BATCH_SIZE):
        batch = addresses[i:i + BATCH_SIZE]
        print(f"[INFO] Subscribing to batch {i//BATCH_SIZE + 1} of {len(addresses)//BATCH_SIZE + 1}")
        await asyncio.gather(*[
            subscribe_to_address(addr, INCLUDE_BACKFILL_FILLS)
            for addr in batch
        ])
        await asyncio.sleep(DELAY_BETWEEN_BATCHES)


# if __name__ == "__main__":
#     asyncio.run(main())

await main()




In [0]:


# async def main():
#     addresses = [
#         "0x0000a0ab1b620e79fa089a16c89c5eeee490f2da",
#         "0x007d76eec0ba411ce873a8819df50dd443d967a0",
#         "0x023a3d058020fb76cca98f01b3c48c8938a22355",
#         "0x03ca2d85b85dc1d61243cc2932382dbc6285fbda",
#         "0x06cecfbac34101ae41c88ebc2450f8602b3d164b",
#         "0x091144e651b334341eabdbbbfed644ad0100023e",
#         "0x0a170cdb6eb5a46c15cffd727c8659f2f371f478",
#         "0x0a97692d91fa9195a52da81cca2051c032c5347d",
#     ]

#     await asyncio.gather(*[subscribe_to_address(addr) for addr in addresses])

# async def main():
#     INCLUDE_BACKFILL_FILLS = True
#     ensure_topics(bootstrap_servers, username, password, sasl_mechanism="SCRAM-SHA-256")

#     addresses = load_addresses_from_config()
#     # addresses = ['0xa44481a6454f4FD0899e261Aa941323f2b11A09b']
#     if not addresses:
#         print("[WARN] No addresses found in config.")
#         return

#     await asyncio.gather(*[
#         subscribe_to_address(addr, INCLUDE_BACKFILL_FILLS)
#         for addr in addresses
#     ])