# LIGHTER BUY SELL

In [14]:
# lighter_buy_sell_notional.py
import os
import math
import requests
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple

BASE_URL = "https://mainnet.zklighter.elliot.ai"


def _to_int(x: Any) -> Optional[int]:
    if x is None:
        return None
    if isinstance(x, bool):
        return int(x)
    if isinstance(x, int):
        return x
    if isinstance(x, float):
        return int(x)
    if isinstance(x, str):
        s = x.strip()
        if s == "":
            return None
        try:
            return int(s)
        except ValueError:
            try:
                return int(float(s))
            except ValueError:
                return None
    return None


def _to_str(x: Any) -> Optional[str]:
    if x is None:
        return None
    if isinstance(x, str):
        return x
    return str(x)


def _extract_list(resp: Dict[str, Any]) -> List[Dict[str, Any]]:
    for k in ["trades", "data", "items", "results"]:
        v = resp.get(k)
        if isinstance(v, list):
            return [x for x in v if isinstance(x, dict)]
    for outer in ["data", "result"]:
        ov = resp.get(outer)
        if isinstance(ov, dict):
            for k in ["trades", "items", "results"]:
                v = ov.get(k)
                if isinstance(v, list):
                    return [x for x in v if isinstance(x, dict)]
    return []


def _extract_next_cursor(resp: Dict[str, Any]) -> Optional[str]:
    for k in ["next_cursor", "nextCursor"]:
        v = resp.get(k)
        if isinstance(v, str) and v:
            return v

    cur = resp.get("cursor")
    if isinstance(cur, str) and cur:
        return cur
    if isinstance(cur, dict):
        for kk in ["next", "next_cursor", "nextCursor"]:
            v = cur.get(kk)
            if isinstance(v, str) and v:
                return v

    meta = resp.get("meta")
    if isinstance(meta, dict):
        for kk in ["next_cursor", "cursor", "next"]:
            v = meta.get(kk)
            if isinstance(v, str) and v:
                return v
    return None


# ---- Trade/Order 구조(Go struct) 기반 파서 ----
def _get_trade_price_size(item: Dict[str, Any]) -> Tuple[Optional[int], Optional[int]]:
    """
    Go struct 기준 Trade: { p: uint32, s: int64 }
    응답이 Trade 단독일 수도 있고, OrderExecution 같이 nested일 수도 있어서 둘 다 커버.
    """
    # 1) flat trade
    p = _to_int(item.get("p"))
    s = _to_int(item.get("s"))
    if p is not None and s is not None:
        return p, s

    # 2) nested trade: { t: { p, s } }
    t = item.get("t")
    if isinstance(t, dict):
        p2 = _to_int(t.get("p"))
        s2 = _to_int(t.get("s"))
        if p2 is not None and s2 is not None:
            return p2, s2

    # 3) 다른 키명 fallback (혹시라도)
    p3 = _to_int(item.get("price"))
    s3 = _to_int(item.get("size"))
    if p3 is not None and s3 is not None:
        return p3, s3

    return None, None


def _get_is_ask(item: Dict[str, Any]) -> Optional[int]:
    """
    매수/매도 판별: Order struct의 IsAsk (ia) 기반.
    trades 응답이 OrderExecution이면 maker/taker order 안에 ia가 있을 수 있음.
    """
    # 1) flat
    for k in ["ia", "is_ask", "isAsk"]:
        v = _to_int(item.get(k))
        if v is not None:
            return 1 if v != 0 else 0

    # 2) maker/taker order 내
    for order_key in ["mo", "to", "maker_order", "taker_order", "makerOrder", "takerOrder"]:
        o = item.get(order_key)
        if isinstance(o, dict):
            v = _to_int(o.get("ia"))
            if v is None:
                v = _to_int(o.get("is_ask")) if "is_ask" in o else _to_int(o.get("isAsk"))
            if v is not None:
                return 1 if v != 0 else 0

    return None


def _notional_from_int_price_size(
    price_int: int,
    size_int: int,
    price_decimals: int,
    size_decimals: int,
) -> float:
    """
    SDK 문서: price/base_amount는 정수로 전달, 시장별 decimals는 orderBookDetails로 조회.
    - price_int는 10^price_decimals 스케일
    - size_int는 10^size_decimals 스케일
    - quote notional = (price_int * size_int) / 10^(price_decimals + size_decimals)
    """
    denom = 10 ** (price_decimals + size_decimals)
    return (price_int * abs(size_int)) / denom


@dataclass
class LighterClient:
    base_url: str = BASE_URL
    authorization: Optional[str] = None
    timeout_sec: int = 30

    @property
    def headers(self) -> Dict[str, str]:
        h = {"accept": "application/json"}
        if self.authorization:
            h["authorization"] = self.authorization
        return h

    # ----- accounts -----
    def accounts_by_l1_address(self, l1_address: str) -> Dict[str, Any]:
        r = requests.get(
            f"{self.base_url}/api/v1/accountsByL1Address",
            params={"l1_address": l1_address},
            headers=self.headers,
            timeout=self.timeout_sec,
        )
        r.raise_for_status()
        return r.json()

    def extract_account_indices(self, payload: Dict[str, Any]) -> List[int]:
        idxs: List[int] = []

        def add(v: Any):
            iv = _to_int(v)
            if iv is not None:
                idxs.append(iv)

        # SDK 예시: resp.sub_accounts[0].index
        # REST 응답은 형태가 다를 수 있어 list 형태를 넓게 커버
        for list_key in ["sub_accounts", "subAccounts", "accounts", "data", "items", "results"]:
            arr = payload.get(list_key)
            if isinstance(arr, list):
                for item in arr:
                    if isinstance(item, dict):
                        for k in ["index", "account_index", "accountIndex", "id"]:
                            if k in item:
                                add(item[k])

        for k in ["index", "account_index", "accountIndex", "id"]:
            if k in payload:
                add(payload[k])

        seen = set()
        uniq = []
        for x in idxs:
            if x not in seen:
                seen.add(x)
                uniq.append(x)
        return uniq

    def resolve_account_indices_from_l1(self, l1_address: str) -> List[int]:
        payload = self.accounts_by_l1_address(l1_address)
        idxs = self.extract_account_indices(payload)
        if not idxs:
            raise KeyError("accountsByL1Address 응답에서 account_index를 찾지 못했습니다. 응답 JSON 1개만 붙여주면 고정 가능합니다.")
        return idxs

    # ----- market precision -----
    def order_book_details(self, market_id: int = 255, filter_: str = "all") -> Dict[str, Any]:
        r = requests.get(
            f"{self.base_url}/api/v1/orderBookDetails",
            params={"market_id": int(market_id), "filter": filter_},
            headers=self.headers,
            timeout=self.timeout_sec,
        )
        r.raise_for_status()
        return r.json()

    def get_market_decimals(self, market_id: int) -> Tuple[int, int]:
        """
        market별 price/size decimals를 orderBookDetails에서 뽑음.
        실제 응답 키명은 환경에 따라 다를 수 있어 후보 키들을 탐색.
        """
        d = self.order_book_details(market_id=market_id)

        # 응답이 단일 마켓 dict일 수도 / 리스트일 수도 있음
        candidates: List[Dict[str, Any]] = []
        if isinstance(d, dict):
            if isinstance(d.get("data"), list):
                candidates = [x for x in d["data"] if isinstance(x, dict)]
            elif isinstance(d.get("order_books"), list):
                candidates = [x for x in d["order_books"] if isinstance(x, dict)]
            else:
                candidates = [d]
        elif isinstance(d, list):
            candidates = [x for x in d if isinstance(x, dict)]

        # market_id 매칭되는 항목 찾기
        chosen: Optional[Dict[str, Any]] = None
        for it in candidates:
            mid = _to_int(it.get("market_id")) or _to_int(it.get("marketId")) or _to_int(it.get("m"))
            if mid == market_id:
                chosen = it
                break
        if chosen is None and candidates:
            chosen = candidates[0]

        if chosen is None:
            raise KeyError("orderBookDetails 응답 파싱 실패")

        # 문서 설명 기반 후보 키
        size_dec = _to_int(chosen.get("supported_size_decimals")) or _to_int(chosen.get("size_decimals")) or _to_int(chosen.get("sd"))
        price_dec = _to_int(chosen.get("supported_price_decimals")) or _to_int(chosen.get("price_decimals")) or _to_int(chosen.get("pd"))

        if size_dec is None or price_dec is None:
            raise KeyError("orderBookDetails에서 price/size decimals를 찾지 못했습니다. 해당 응답 JSON을 붙여주면 키를 고정해줄게요.")

        return int(price_dec), int(size_dec)

    # ----- trades -----
    def trades_page(
        self,
        *,
        sort_by: str,                 # 필수: block_height|timestamp|trade_id
        limit: int,                   # 필수: 1..100
        auth: Optional[str] = None,
        market_id: int = 255,
        account_index: int = -1,
        order_index: Optional[int] = None,
        sort_dir: str = "desc",
        cursor: Optional[str] = None,
        from_: int = -1,
        ask_filter: int = -1,
        role: str = "all",            # all|maker|taker
        type_: str = "all",           # all|trade|liquidation|deleverage|market-settlement
        aggregate: bool = False,
    ) -> Dict[str, Any]:
        url = f"{self.base_url}/api/v1/trades"
        params: Dict[str, Any] = {
            "market_id": int(market_id),
            "account_index": int(account_index),
            "sort_by": sort_by,
            "sort_dir": sort_dir,
            "from": int(from_),
            "ask_filter": int(ask_filter),
            "role": role,
            "type": type_,
            "limit": int(limit),
            "aggregate": str(aggregate).lower(),
        }
        if auth is not None:
            params["auth"] = auth
        if order_index is not None:
            params["order_index"] = int(order_index)
        if cursor:
            params["cursor"] = cursor

        r = requests.get(url, params=params, headers=self.headers, timeout=self.timeout_sec)
        r.raise_for_status()
        return r.json()

    def aggregate_account_buy_sell_notional(
        self,
        *,
        account_index: int,
        market_id: int = 255,
        auth: Optional[str] = None,
        type_: str = "trade",
        role: str = "all",
        sort_by: str = "timestamp",
        sort_dir: str = "desc",
        limit: int = 100,
        max_pages: int = 500,
        from_: int = -1,
        ask_filter: int = -1,
        aggregate_flag: bool = False,
        price_decimals: Optional[int] = None,
        size_decimals: Optional[int] = None,
    ) -> Dict[str, Any]:
        # market_id=255(전체)일 때는 market별 decimals가 달라서 정확 스케일링이 어려움
        # 정확히 하려면 market_id를 개별로 호출하거나, trades row에서 market_id를 읽어서 per-market decimals를 쓰면 됨.
        if market_id != 255 and (price_decimals is None or size_decimals is None):
            price_decimals, size_decimals = self.get_market_decimals(market_id)

        buy_notional = 0.0
        sell_notional = 0.0
        unknown_rows = 0

        cursor: Optional[str] = None

        for _ in range(max_pages):
            resp = self.trades_page(
                auth=auth,
                market_id=market_id,
                account_index=account_index,
                order_index=None,
                sort_by=sort_by,
                sort_dir=sort_dir,
                cursor=cursor,
                from_=from_,
                ask_filter=ask_filter,
                role=role,
                type_=type_,
                limit=limit,
                aggregate=aggregate_flag,
            )

            rows = _extract_list(resp)
            if not rows:
                break

            for row in rows:
                p_int, s_int = _get_trade_price_size(row)
                is_ask = _get_is_ask(row)

                # market_id=255이면 row에서 market_id를 읽어서 decimals를 잡는 방식으로 정확도를 올릴 수 있음
                if market_id == 255:
                    mid = _to_int(row.get("m")) or _to_int(row.get("market_id")) or _to_int(row.get("marketId"))
                    if mid is None:
                        unknown_rows += 1
                        continue
                    try:
                        pd, sd = self.get_market_decimals(int(mid))
                    except Exception:
                        unknown_rows += 1
                        continue
                else:
                    pd, sd = int(price_decimals), int(size_decimals)

                if p_int is None or s_int is None or is_ask is None:
                    unknown_rows += 1
                    continue

                notional = _notional_from_int_price_size(p_int, s_int, pd, sd)

                if is_ask == 1:
                    sell_notional += notional
                else:
                    buy_notional += notional

            next_cursor = _extract_next_cursor(resp)
            if not next_cursor or next_cursor == cursor:
                break
            cursor = next_cursor

        return {
            "account_index": account_index,
            "market_id": market_id,
            "buy_notional": buy_notional,
            "sell_notional": sell_notional,
            "net_notional": buy_notional - sell_notional,
            "unknown_trade_rows": unknown_rows,
        }

    def aggregate_l1_buy_sell_notional(
        self,
        *,
        l1_address: str,
        market_id: int = 255,
        auth: Optional[str] = None,
        type_: str = "trade",
        role: str = "all",
        sort_by: str = "timestamp",
        sort_dir: str = "desc",
        limit: int = 100,
        max_pages: int = 500,
        from_: int = -1,
        ask_filter: int = -1,
        aggregate_flag: bool = False,
    ) -> Dict[str, Any]:
        indices = self.resolve_account_indices_from_l1(l1_address)

        total_buy = 0.0
        total_sell = 0.0
        total_unknown = 0
        per_account: List[Dict[str, Any]] = []

        for idx in indices:
            r = self.aggregate_account_buy_sell_notional(
                account_index=idx,
                market_id=market_id,
                auth=auth,
                type_=type_,
                role=role,
                sort_by=sort_by,
                sort_dir=sort_dir,
                limit=limit,
                max_pages=max_pages,
                from_=from_,
                ask_filter=ask_filter,
                aggregate_flag=aggregate_flag,
            )
            per_account.append(r)
            total_buy += r["buy_notional"]
            total_sell += r["sell_notional"]
            total_unknown += r["unknown_trade_rows"]

        return {
            "l1_address": l1_address,
            "account_indices": indices,
            "market_id": market_id,
            "buy_notional": total_buy,
            "sell_notional": total_sell,
            "net_notional": total_buy - total_sell,
            "unknown_trade_rows": total_unknown,
            "per_account": per_account,
        }

    

if __name__ == "__main__":
    
    import os

    authorization = os.getenv("None")  # 필요없으면 None
    c = LighterClient(authorization=authorization)

    l1 = "0x15943C988cc42c34f677B2EB79ec8DCBE064BEfa"  # 대상 지갑주소
    
    result = c.aggregate_l1_buy_sell_notional(
        l1_address=l1,
        market_id=255,      # 전체 마켓(spot+perp 전체)
        type_="trade",      # 일반 체결만
        role="all",
        sort_by="timestamp",
        sort_dir="desc",
        limit=100,
        max_pages=200,
    )

    print("BUY(매수금액):", result["buy_notional"])
    print("SELL(매도금액):", result["sell_notional"])
    print("NET(순매수):", result["net_notional"])
    print("계정별:", result["per_account"])



TypeError: aggregate_l1_buy_sell_notional() missing 1 required keyword-only argument: 'l1_address'

# 1) L1 지갑주소로 보기

L1 주소 하나에 여러 sub-account 가 붙을 수 있음

In [2]:
if __name__ == "__main__":
    import os

    authorization = os.getenv(None)  # 필요없으면 None
    c = LighterClient(authorization=authorization)

    l1 = "0x123..."  # 대상 지갑주소
    result = c.aggregate_l1_buy_sell_notional(
        l1_address=l1,
        market_id=255,      # 전체 마켓(spot+perp 전체)
        type_="trade",      # 일반 체결만
        role="all",
        sort_by="timestamp",
        sort_dir="desc",
        limit=100,
        max_pages=200,
    )

    print("BUY(매수금액):", result["buy_notional"])
    print("SELL(매도금액):", result["sell_notional"])
    print("NET(순매수):", result["net_notional"])
    print("계정별:", result["per_account"])


{"status":200,"network_id":1,"timestamp":1770548710}


In [3]:
import asyncio
import lighter

BASE_URL = "https://mainnet.zklighter.elliot.ai"

ACCOUNT_INDEX = 12345        # 본인 account_index
API_KEY_INDEX = 2            # API key index (2~254)
PRIVATE_KEY = "0xabc..."     # API private key

async def main():
    client = lighter.SignerClient(
        url=BASE_URL,
        api_private_keys={API_KEY_INDEX: PRIVATE_KEY},
        account_index=ACCOUNT_INDEX,
    )

    # auth token 생성 (1시간 유효)
    auth_token, err = client.create_auth_token_with_expiry(
        deadline=3600,  # seconds (max 8 hours)
        api_key_index=API_KEY_INDEX,
    )

    if err:
        raise Exception(err)

    print("AUTH TOKEN:")
    print(auth_token)

    await client.close()

asyncio.run(main())


{"code":200,"l1_providers":[{"chainId":1,"networkId":1,"latestBlockNumber":0}],"l1_providers_health":true,"validator_info":[],"contract_addresses":[{"name":"ZkLighterContract","address":"0x3B4D794a66304F130a4Db8F2551B0070dfCf5ca7"},{"name":"USDCContract","address":"0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48"}],"latest_l1_generic_block":0,"latest_l1_governance_block":0,"latest_l1_desert_block":0}


In [9]:
import asyncio
import logging
import time
import eth_account
import lighter
from utils import save_api_key_config

logging.basicConfig(level=logging.DEBUG)

# this is a dummy private key registered on Testnet.
# It serves as a good example
BASE_URL = "https://testnet.zklighter.elliot.ai"
ETH_PRIVATE_KEY = "1234567812345678123456781234567812345678123456781234567812345678"
API_KEY_INDEX = 3
NUM_API_KEYS = 5

# If you set this to something other than None, the script will use that account index instead of using the master account index.
# This is useful if you have multiple accounts on the same L1 address or are the owner of a public pool.
# You need to use the private key associated to the master account or the owner of the public pool to change the API keys.
ACCOUNT_INDEX = None

async def main():
    # verify that the account exists & fetch account index
    api_client = lighter.ApiClient(configuration=lighter.Configuration(host=BASE_URL))
    eth_acc = eth_account.Account.from_key(ETH_PRIVATE_KEY)
    eth_address = eth_acc.address

    if ACCOUNT_INDEX is not None:
        account_index = ACCOUNT_INDEX
    else:
        try:
            response = await lighter.AccountApi(api_client).accounts_by_l1_address(l1_address=eth_address)
        except lighter.ApiException as e:
            if e.data.message == "account not found":
                print(f"error: account not found for {eth_address}")
                return
            else:
                raise e

        if len(response.sub_accounts) > 1:
            for sub_account in response.sub_accounts:
                print(f"found accountIndex: {sub_account.index}")

            account = min(response.sub_accounts, key=lambda x: int(x.index))
            account_index = account.index
            print(f"multiple accounts found, using the master account {account_index}")
        else:
            account_index = response.sub_accounts[0].index


    # create a private/public key pair for the new API key
    # pass any string to be used as seed for create_api_key like
    # create_api_key("Hello world random seed to make things more secure")

    private_keys = {}
    public_keys = []

    for i in range(NUM_API_KEYS):
        private_key, public_key, err = lighter.create_api_key()
        if err is not None:
            raise Exception(err)
        public_keys.append(public_key)
        private_keys[API_KEY_INDEX + i] = private_key

    tx_client = lighter.SignerClient(
        url=BASE_URL,
        account_index=account_index,
        api_private_keys=private_keys,
    )

    # change all API keys
    for i in range(NUM_API_KEYS):
        response, err = await tx_client.change_api_key(
            eth_private_key=ETH_PRIVATE_KEY,
            new_pubkey=public_keys[i],
            api_key_index=API_KEY_INDEX + i
        )
        if err is not None:
            raise Exception(err)

    # wait some time so that we receive the new API key in the response
    time.sleep(10)

    # check that the API key changed on the server
    err = tx_client.check_client()
    if err is not None:
        raise Exception(err)

    save_api_key_config(BASE_URL, account_index, private_keys)

    await tx_client.close()
    await api_client.close()


if __name__ == "__main__":
    asyncio.run(main())



ContextualVersionConflict: (cytoolz 0.9.0.1 (/Users/sangjinlee/opt/anaconda3/lib/python3.7/site-packages), Requirement.parse('cytoolz>=0.10.1; implementation_name == "cpython"'), {'eth-utils'})

In [10]:
from lighter_buy_sell_notional import LighterClient

auth = "ro:713400:all:2085908149:867cb518050c178237f17adfedc72056559b8bdf632898387a23b138a1019031"

c = LighterClient(authorization=auth)

result = c.aggregate_account_buy_sell_notional(
    account_index=713400,
    market_id=255
)

print(result)


ModuleNotFoundError: No module named 'lighter_buy_sell_notional'

In [15]:
RO_TOKEN = "ro:713400:all:2085908149:867cb518050c178237f17adfedc72056559b8bdf632898387a23b138a1019031"
BASE_URL = "https://mainnet.zklighter.elliot.ai"

In [16]:
# 토큰에서 account_index 자동 추출

def parse_ro_token(ro_token: str) -> int:
    # 형식: ro:{account_index}:{single|all}:{expiry}:{random}
    parts = ro_token.split(":")
    if len(parts) < 5 or parts[0] != "ro":
        raise ValueError("read-only token 형식(ro:...)이 아닙니다.")
    return int(parts[1])

ACCOUNT_INDEX = parse_ro_token(RO_TOKEN)
ACCOUNT_INDEX


713400

In [22]:
# requests가 되는지 “trades 1페이지” 바로 찍어보기

import requests

params = {
    "sort_by": "timestamp",
    "limit": 5,
    "account_index": ACCOUNT_INDEX,
    "market_id": 255,
    "type": "trade",
    "role": "all",
}

r = requests.get(
    f"{BASE_URL}/api/v1/trades",
    params=params,
    headers={"accept": "application/json", "authorization": RO_TOKEN},
    timeout=30,
)

print("status:", r.status_code)
print(r.text[:1000])  # 너무 길면 앞부분만


status: 200
{"code":200,"next_cursor":"eyJpbmRleCI6MTM4NzMzNDIzMzB9","trades":[{"trade_id":13873420613,"tx_hash":"bcbc951bbc5227d070210e7e0cf6d37af81aee7ef97806efd1d45f201beda80945fcb00a0966aaed","type":"trade","market_id":0,"size":"0.0034","price":"2103.00","usd_amount":"7.150200","ask_id":281476320154672,"bid_id":562948626860318,"ask_client_id":177054897153630,"bid_client_id":0,"ask_account_id":674018,"bid_account_id":713400,"is_maker_ask":false,"block_height":171608576,"timestamp":1770548971811,"taker_position_size_before":"11.6469","taker_entry_quote_before":"24408.906558","taker_initial_margin_fraction_before":500,"maker_position_size_before":"0.0400","maker_entry_quote_before":"84.120000","maker_initial_margin_fraction_before":1250,"transaction_time":1770548971846313},{"trade_id":13873420581,"tx_hash":"9a30fcbe128b8617d43f0beccfa5c2b06662f5f3185bf578ff7e97926c0bb83d0ce53b3f5954e77a","type":"trade","market_id":0,"size":"0.0400","price":"2103.00","usd_amount":"84.120000","ask_id":2

In [23]:
# 매수/매도 금액 집계


import requests
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple

def _to_int(x: Any) -> Optional[int]:
    if x is None:
        return None
    if isinstance(x, bool):
        return int(x)
    if isinstance(x, int):
        return x
    if isinstance(x, float):
        return int(x)
    if isinstance(x, str):
        s = x.strip()
        if s == "":
            return None
        try:
            return int(s)
        except ValueError:
            try:
                return int(float(s))
            except ValueError:
                return None
    return None

def _extract_list(resp: Dict[str, Any]) -> List[Dict[str, Any]]:
    for k in ["trades", "data", "items", "results"]:
        v = resp.get(k)
        if isinstance(v, list):
            return [x for x in v if isinstance(x, dict)]
    for outer in ["data", "result"]:
        ov = resp.get(outer)
        if isinstance(ov, dict):
            for k in ["trades", "items", "results"]:
                v = ov.get(k)
                if isinstance(v, list):
                    return [x for x in v if isinstance(x, dict)]
    return []

def _extract_next_cursor(resp: Dict[str, Any]) -> Optional[str]:
    for k in ["next_cursor", "nextCursor"]:
        v = resp.get(k)
        if isinstance(v, str) and v:
            return v
    cur = resp.get("cursor")
    if isinstance(cur, str) and cur:
        return cur
    if isinstance(cur, dict):
        for kk in ["next", "next_cursor", "nextCursor"]:
            v = cur.get(kk)
            if isinstance(v, str) and v:
                return v
    meta = resp.get("meta")
    if isinstance(meta, dict):
        for kk in ["next_cursor", "cursor", "next"]:
            v = meta.get(kk)
            if isinstance(v, str) and v:
                return v
    return None

def _get_trade_price_size(item: Dict[str, Any]) -> Tuple[Optional[int], Optional[int]]:
    # Trade: { p: uint32, s: int64 } 또는 { t: { p, s } }
    p = _to_int(item.get("p"))
    s = _to_int(item.get("s"))
    if p is not None and s is not None:
        return p, s
    t = item.get("t")
    if isinstance(t, dict):
        p2 = _to_int(t.get("p"))
        s2 = _to_int(t.get("s"))
        if p2 is not None and s2 is not None:
            return p2, s2
    # fallback
    p3 = _to_int(item.get("price"))
    s3 = _to_int(item.get("size"))
    if p3 is not None and s3 is not None:
        return p3, s3
    return None, None

def _get_is_ask(item: Dict[str, Any]) -> Optional[int]:
    # Order.IsAsk: ia
    for k in ["ia", "is_ask", "isAsk"]:
        v = _to_int(item.get(k))
        if v is not None:
            return 1 if v != 0 else 0
    # maker/taker order 안에 있을 수도 있음
    for order_key in ["mo", "to", "maker_order", "taker_order", "makerOrder", "takerOrder"]:
        o = item.get(order_key)
        if isinstance(o, dict):
            v = _to_int(o.get("ia"))
            if v is None:
                v = _to_int(o.get("is_ask")) if "is_ask" in o else _to_int(o.get("isAsk"))
            if v is not None:
                return 1 if v != 0 else 0
    return None

def _notional_from_int_price_size(price_int: int, size_int: int, price_decimals: int, size_decimals: int) -> float:
    denom = 10 ** (price_decimals + size_decimals)
    return (price_int * abs(size_int)) / denom

@dataclass
class LighterClient:
    base_url: str
    authorization: Optional[str] = None
    timeout_sec: int = 30

    @property
    def headers(self) -> Dict[str, str]:
        h = {"accept": "application/json"}
        if self.authorization:
            h["authorization"] = self.authorization
        return h

    def order_book_details(self, market_id: int = 255, filter_: str = "all") -> Dict[str, Any]:
        r = requests.get(
            f"{self.base_url}/api/v1/orderBookDetails",
            params={"market_id": int(market_id), "filter": filter_},
            headers=self.headers,
            timeout=self.timeout_sec,
        )
        r.raise_for_status()
        return r.json()

    def get_market_decimals(self, market_id: int) -> Tuple[int, int]:
        d = self.order_book_details(market_id=market_id)

        candidates: List[Dict[str, Any]] = []
        if isinstance(d, dict):
            if isinstance(d.get("data"), list):
                candidates = [x for x in d["data"] if isinstance(x, dict)]
            elif isinstance(d.get("order_books"), list):
                candidates = [x for x in d["order_books"] if isinstance(x, dict)]
            else:
                candidates = [d]
        elif isinstance(d, list):
            candidates = [x for x in d if isinstance(x, dict)]

        chosen: Optional[Dict[str, Any]] = None
        for it in candidates:
            mid = _to_int(it.get("market_id")) or _to_int(it.get("marketId")) or _to_int(it.get("m"))
            if mid == market_id:
                chosen = it
                break
        if chosen is None and candidates:
            chosen = candidates[0]
        if chosen is None:
            raise KeyError("orderBookDetails 응답 파싱 실패")

        size_dec = _to_int(chosen.get("supported_size_decimals")) or _to_int(chosen.get("size_decimals")) or _to_int(chosen.get("sd"))
        price_dec = _to_int(chosen.get("supported_price_decimals")) or _to_int(chosen.get("price_decimals")) or _to_int(chosen.get("pd"))
        if size_dec is None or price_dec is None:
            raise KeyError("orderBookDetails에서 decimals를 찾지 못했습니다. 응답 JSON을 확인해 키를 고정해야 합니다.")
        return int(price_dec), int(size_dec)

    def trades_page(
        self,
        *,
        sort_by: str,
        limit: int,
        market_id: int = 255,
        account_index: int = -1,
        sort_dir: str = "desc",
        cursor: Optional[str] = None,
        from_: int = -1,
        ask_filter: int = -1,
        role: str = "all",
        type_: str = "trade",
        aggregate: bool = False,
    ) -> Dict[str, Any]:
        params: Dict[str, Any] = {
            "market_id": int(market_id),
            "account_index": int(account_index),
            "sort_by": sort_by,
            "sort_dir": sort_dir,
            "from": int(from_),
            "ask_filter": int(ask_filter),
            "role": role,
            "type": type_,
            "limit": int(limit),
            "aggregate": str(aggregate).lower(),
        }
        if cursor:
            params["cursor"] = cursor

        r = requests.get(
            f"{self.base_url}/api/v1/trades",
            params=params,
            headers=self.headers,
            timeout=self.timeout_sec,
        )
        r.raise_for_status()
        return r.json()

    def aggregate_account_buy_sell_notional(
        self,
        *,
        account_index: int,
        market_id: int = 255,
        sort_by: str = "timestamp",
        sort_dir: str = "desc",
        limit: int = 100,
        max_pages: int = 50,
        type_: str = "trade",
        role: str = "all",
        from_: int = -1,
        ask_filter: int = -1,
    ) -> Dict[str, Any]:
        buy_notional = 0.0
        sell_notional = 0.0
        unknown_rows = 0

        cursor: Optional[str] = None

        for _ in range(max_pages):
            resp = self.trades_page(
                sort_by=sort_by,
                limit=limit,
                market_id=market_id,
                account_index=account_index,
                sort_dir=sort_dir,
                cursor=cursor,
                from_=from_,
                ask_filter=ask_filter,
                role=role,
                type_=type_,
                aggregate=False,
            )

            rows = _extract_list(resp)
            if not rows:
                break

            for row in rows:
                p_int, s_int = _get_trade_price_size(row)
                is_ask = _get_is_ask(row)

                # market_id=255이면 row에서 market_id 읽어서 per-market decimals 적용
                if market_id == 255:
                    mid = _to_int(row.get("m")) or _to_int(row.get("market_id")) or _to_int(row.get("marketId"))
                    if mid is None:
                        unknown_rows += 1
                        continue
                    try:
                        pd, sd = self.get_market_decimals(int(mid))
                    except Exception:
                        unknown_rows += 1
                        continue
                else:
                    pd, sd = self.get_market_decimals(market_id)

                if p_int is None or s_int is None or is_ask is None:
                    unknown_rows += 1
                    continue

                notional = _notional_from_int_price_size(p_int, s_int, pd, sd)
                if is_ask == 1:
                    sell_notional += notional
                else:
                    buy_notional += notional

            next_cursor = _extract_next_cursor(resp)
            if not next_cursor or next_cursor == cursor:
                break
            cursor = next_cursor

        return {
            "account_index": account_index,
            "market_id": market_id,
            "buy_notional": buy_notional,
            "sell_notional": sell_notional,
            "net_notional": buy_notional - sell_notional,
            "unknown_trade_rows": unknown_rows,
            "pages_fetched": max_pages,
            "limit_per_page": limit,
        }


In [24]:
# 토큰으로 실행

c = LighterClient(base_url=BASE_URL, authorization=RO_TOKEN)

result = c.aggregate_account_buy_sell_notional(
    account_index=ACCOUNT_INDEX,
    market_id=255,     # 전체 마켓
    limit=100,
    max_pages=10,      # 테스트용(늘리면 더 많이 집계)
)

result


{'account_index': 713400,
 'market_id': 255,
 'buy_notional': 0.0,
 'sell_notional': 0.0,
 'net_notional': 0.0,
 'unknown_trade_rows': 6,
 'pages_fetched': 10,
 'limit_per_page': 100}

In [25]:
print("BUY(매수금액):", result["buy_notional"])
print("SELL(매도금액):", result["sell_notional"])
print("NET(순매수):", result["net_notional"])
print("unknown_trade_rows:", result["unknown_trade_rows"])


BUY(매수금액): 0.0
SELL(매도금액): 0.0
NET(순매수): 0.0
unknown_trade_rows: 6


In [26]:
import pandas as pd

resp = c.trades_page(
    sort_by="timestamp",
    limit=20,
    market_id=255,
    account_index=ACCOUNT_INDEX,
    type_="trade"
)

rows = _extract_list(resp)
pd.DataFrame(rows).head()


Unnamed: 0,trade_id,tx_hash,type,market_id,size,price,usd_amount,ask_id,bid_id,ask_client_id,...,taker_entry_quote_before,taker_initial_margin_fraction_before,maker_position_size_before,maker_entry_quote_before,maker_initial_margin_fraction_before,transaction_time,maker_position_sign_changed,taker_position_sign_changed,maker_fee,taker_fee
0,13873420613,bcbc951bbc5227d070210e7e0cf6d37af81aee7ef97806...,trade,0,0.0034,2103.0,7.1502,281476320154672,562948626860318,177054897153630,...,24408.906558,500,0.04,84.12,1250,1770548971846313,,,,
1,13873420581,9a30fcbe128b8617d43f0beccfa5c2b06662f5f3185bf5...,trade,0,0.04,2103.0,84.12,281476320154668,562948626860318,177054897151244,...,24492.73627,500,0.0,0.0,1250,1770548971827574,True,,,
2,13873374101,f1aaa604ec2447ded6ea7e10b3bc1de60cb1d9df12702c...,trade,0,0.0817,2102.77,171.796309,281476320149934,562948626861820,0,...,171.990065,1250,3.4331,7223.388701,3333,1770548876059163,,True,20.0,
3,13873342745,eecd34a89901ec43f46b90a626b69385134871be716abc...,trade,0,0.0272,2103.69,57.220368,281476320146823,562948626876260,232983147186661,...,4712.619646,3333,0.0545,114.769697,1250,1770548820263147,,,,200.0
4,13873342330,a6f83549178b9f989e752335efe2dbb446e00a35ccaa8a...,trade,0,0.0001,2103.69,0.210369,281476320146723,562948626876260,3513,...,95.209497,500,0.0544,114.559328,1250,1770548820187961,,,,


In [27]:
RO_TOKEN = "ro:713400:all:2085908149:867cb518050c178237f17adfedc72056559b8bdf632898387a23b138a1019031"
BASE_URL = "https://mainnet.zklighter.elliot.ai"

ACCOUNT_INDEX = 713400


In [28]:
# trades 1페이지 가져오기

import requests

params = {
    "sort_by": "timestamp",
    "limit": 5,
    "account_index": ACCOUNT_INDEX,
    "market_id": 255,
    "type": "trade",
    "role": "all",
}

r = requests.get(
    f"{BASE_URL}/api/v1/trades",
    params=params,
    headers={"accept": "application/json", "authorization": RO_TOKEN},
    timeout=30,
)

print("status:", r.status_code)
data = r.json()
data.keys(), data.get("next_cursor"), len(data.get("trades", []))


status: 200


(dict_keys(['code', 'next_cursor', 'trades']),
 'eyJpbmRleCI6MTM4NzMzNDIzMzB9',
 5)

In [29]:
# 매수/매도 금액 합산

from typing import Optional, Dict, Any, List, Tuple

def _to_float(x: Any) -> Optional[float]:
    if x is None:
        return None
    if isinstance(x, (int, float)):
        return float(x)
    if isinstance(x, str):
        try:
            return float(x)
        except ValueError:
            return None
    return None

def fetch_trades_page(
    *,
    base_url: str,
    token: str,
    account_index: int,
    limit: int = 100,
    cursor: Optional[str] = None,
    market_id: int = 255,
    sort_by: str = "timestamp",
    sort_dir: str = "desc",
    type_: str = "trade",
    role: str = "all",
) -> Dict[str, Any]:
    params = {
        "sort_by": sort_by,
        "sort_dir": sort_dir,
        "limit": limit,
        "account_index": account_index,
        "market_id": market_id,
        "type": type_,
        "role": role,
    }
    if cursor:
        params["cursor"] = cursor

    r = requests.get(
        f"{base_url}/api/v1/trades",
        params=params,
        headers={"accept": "application/json", "authorization": token},
        timeout=30,
    )
    r.raise_for_status()
    return r.json()

def aggregate_buy_sell_usd(
    *,
    base_url: str,
    token: str,
    account_index: int,
    market_id: int = 255,
    max_pages: int = 50,
    limit: int = 100,
) -> Dict[str, Any]:
    buy_usd = 0.0
    sell_usd = 0.0
    unknown = 0
    rows_seen = 0

    cursor = None

    for _ in range(max_pages):
        resp = fetch_trades_page(
            base_url=base_url,
            token=token,
            account_index=account_index,
            market_id=market_id,
            limit=limit,
            cursor=cursor,
        )

        trades = resp.get("trades", [])
        if not isinstance(trades, list) or len(trades) == 0:
            break

        for t in trades:
            if not isinstance(t, dict):
                unknown += 1
                continue

            usd = _to_float(t.get("usd_amount"))
            ask_acc = t.get("ask_account_id")
            bid_acc = t.get("bid_account_id")

            if usd is None:
                # 혹시 usd_amount가 없으면 price*size로 대체(필요시)
                price = _to_float(t.get("price"))
                size = _to_float(t.get("size"))
                if price is None or size is None:
                    unknown += 1
                    continue
                usd = abs(price * size)

            # 매수/매도 판별
            if bid_acc == account_index:
                buy_usd += usd
            elif ask_acc == account_index:
                sell_usd += usd
            else:
                # account_index 필터가 들어갔는데도 여기 걸리면 응답 구조가 다른 케이스
                unknown += 1

            rows_seen += 1

        next_cursor = resp.get("next_cursor")
        if not next_cursor or next_cursor == cursor:
            break
        cursor = next_cursor

    return {
        "account_index": account_index,
        "market_id": market_id,
        "buy_usd": buy_usd,
        "sell_usd": sell_usd,
        "net_usd": buy_usd - sell_usd,
        "rows_seen": rows_seen,
        "unknown_rows": unknown,
        "last_cursor": cursor,
    }


In [30]:
result = aggregate_buy_sell_usd(
    base_url=BASE_URL,
    token=RO_TOKEN,
    account_index=ACCOUNT_INDEX,
    market_id=255,
    max_pages=20,  # 테스트: 20페이지(최대 2000 trades)
    limit=100,
)

result

{'account_index': 713400,
 'market_id': 255,
 'buy_usd': 263.260265,
 'sell_usd': 171.796309,
 'net_usd': 91.463956,
 'rows_seen': 6,
 'unknown_rows': 0,
 'last_cursor': None}

In [31]:
print("BUY USD:", result["buy_usd"])
print("SELL USD:", result["sell_usd"])
print("NET USD:", result["net_usd"])
print("rows_seen:", result["rows_seen"], "unknown:", result["unknown_rows"])


BUY USD: 263.260265
SELL USD: 171.796309
NET USD: 91.463956
rows_seen: 6 unknown: 0


In [37]:
import pandas as pd

resp = fetch_trades_page(
    base_url=BASE_URL,
    token=RO_TOKEN,
    account_index=ACCOUNT_INDEX,
    limit=50,
    market_id=255
)

df = pd.DataFrame(resp["trades"])
df["datetime"] = (pd.to_datetime(df["timestamp"], unit="ms", utc=True).dt.tz_convert("Asia/Seoul"))
df[["timestamp","market_id","price","size","usd_amount","ask_account_id","bid_account_id","is_maker_ask"]]


Unnamed: 0,timestamp,market_id,price,size,usd_amount,ask_account_id,bid_account_id,is_maker_ask
0,1770548971811,0,2103.0,0.0034,7.1502,674018,713400,False
1,1770548971811,0,2103.0,0.04,84.12,674018,713400,False
2,1770548875952,0,2102.77,0.0817,171.796309,713400,688859,False
3,1770548820228,0,2103.69,0.0272,57.220368,688859,713400,False
4,1770548820186,0,2103.69,0.0001,0.210369,712096,713400,False
5,1770548719370,0,2105.87,0.0544,114.559328,712963,713400,True


In [60]:
import requests
import pandas as pd

RO_TOKEN = "ro:6183:single:1778333113:5ad34daa8a140e08dd8d1ee3f07bc7d20c305975122acbb8fe682cd659d9b594"
BASE_URL = "https://mainnet.zklighter.elliot.ai"
ACCOUNT_INDEX = 6183


params = {
    "sort_by": "timestamp",
    "sort_dir": "desc",
    "limit": 50,
    "account_index": ACCOUNT_INDEX,
    "market_id": 255,
    "type": "liquidation",   # ✅ 여기만 변경
    "role": "all",
}

r = requests.get(
    f"{BASE_URL}/api/v1/trades",
    params=params,
    headers={"accept":"application/json", "authorization": RO_TOKEN},
    timeout=30,
)
print("status:", r.status_code)

data = r.json()
df_liq = pd.DataFrame(data.get("trades", []))
df_liq.head()


status: 200


Unnamed: 0,trade_id,tx_hash,type,market_id,size,price,usd_amount,ask_id,bid_id,ask_client_id,...,taker_position_size_before,taker_entry_quote_before,taker_initial_margin_fraction_before,taker_position_sign_changed,maker_position_size_before,maker_entry_quote_before,maker_initial_margin_fraction_before,transaction_time,maker_fee,maker_position_sign_changed
0,304595212,000000042a5659520000019a1fdab1e000000000000000...,liquidation,0,0.0001,4009.98,0.400998,281475375768293,562949581631914,0,...,-0.0001,0.395187,200,True,0.493,1977.50188,500,0,,
1,272156163,00000003d14892c30000019a06e2e01000000000000000...,liquidation,0,0.0014,3875.46,5.425644,281475327794332,562949619740294,0,...,0.0014,5.480062,200,True,3.5809,13883.649047,500,0,20.0,
2,268940752,00000003ca4548b70000019a04e697ab00000000000000...,liquidation,0,0.0184,3920.0,72.128,281475323990225,562949624007089,0,...,0.0184,72.899696,200,True,-1.2763,5076.534302,200,0,20.0,
3,265228777,00000003c1762dc90000019a0256b24300000000000000...,liquidation,0,0.234,4005.89,937.37826,281475318677571,562949629032257,0,...,0.234,947.150181,200,True,2469.8164,9879360.754982,500,0,20.0,
4,265228776,00000003c1762dc80000019a0256b24300000000000000...,liquidation,0,0.02,4005.9,80.118,281475318677571,562949628430228,0,...,0.254,1028.103189,200,,0.36,1446.8867,500,0,,


In [43]:
# RO_TOKEN = "ro:713400:all:2085908149:867cb518050c178237f17adfedc72056559b8bdf632898387a23b138a1019031"
# BASE_URL = "https://mainnet.zklighter.elliot.ai"
# ACCOUNT_INDEX = 713400

# FX_KRW_PER_USD = 1300.0  # 적용환율(원/달러) - 원하는 값으로 바꾸면 됨


RO_TOKEN = "ro:6183:single:1778333113:5ad34daa8a140e08dd8d1ee3f07bc7d20c305975122acbb8fe682cd659d9b594"
BASE_URL = "https://mainnet.zklighter.elliot.ai"
ACCOUNT_INDEX = 6183

FX_KRW_PER_USD = 1300.0  # 적용환율(원/달러) - 원하는 값으로 바꾸면 됨


In [57]:
import requests
import pandas as pd
from typing import Optional, Dict, Any, List

def fetch_trades_page(limit=100, cursor: Optional[str]=None) -> Dict[str, Any]:
    params = {
        "sort_by": "timestamp",
        "sort_dir": "desc",
        "limit": limit,
        "account_index": ACCOUNT_INDEX,
        "market_id": 255,
        "type": "trade",
        "role": "all",
    }
    if cursor:
        params["cursor"] = cursor

    r = requests.get(
        f"{BASE_URL}/api/v1/trades",
        params=params,
        headers={"accept": "application/json", "authorization": RO_TOKEN},
        timeout=30,
    )
    r.raise_for_status()
    return r.json()

def fetch_trades(max_pages=2000, limit=100) -> pd.DataFrame:
    rows: List[Dict[str, Any]] = []
    cursor = None
    for _ in range(max_pages):
        resp = fetch_trades_page(limit=limit, cursor=cursor)
        trades = resp.get("trades", [])
        if not trades:
            break
        rows.extend(trades)
        cursor = resp.get("next_cursor")
        if not cursor:
            break
    return pd.DataFrame(rows)

df_trades = fetch_trades(max_pages=10, limit=100)  # 필요 시 max_pages 늘리기
df_trades.head()


Unnamed: 0,trade_id,tx_hash,type,market_id,size,price,usd_amount,ask_id,bid_id,ask_client_id,...,taker_position_size_before,taker_entry_quote_before,taker_position_sign_changed,maker_position_size_before,maker_entry_quote_before,maker_position_sign_changed,transaction_time,maker_fee,taker_initial_margin_fraction_before,maker_initial_margin_fraction_before
0,797434188,1f46c7df487ed65be0cb3e9671fd4280880aec57f17c8b...,trade,2049,988.15,2.3562,2328.27903,577023702281970440,577305177208086783,0,...,0.0,0.0,True,0.0,0.0,True,1768213099621559,,,
1,797434184,b1a605545aabfbe43c69d17554b6408b7a705be74a825e...,trade,2049,860.11,2.3562,2026.591182,577023702281970440,577305177208086785,0,...,0.0,0.0,True,0.0,0.0,True,1768213099451457,,,
2,797434183,0d3d3154f029a67597d19da529731966535f8cc8d988ff...,trade,2049,708.18,2.3562,1668.613716,577023702281970440,577305177208086786,0,...,0.0,0.0,True,0.0,0.0,True,1768213099451404,,,
3,797434182,506b77ef4939e25418e74d3c7f0722eb42e3ba8d39d090...,trade,2049,1243.87,2.3562,2930.806494,577023702281970440,577305177208086787,0,...,0.0,0.0,True,0.0,0.0,True,1768213099451213,,,
4,797434180,b279075c4412b07f2b4e8975a33d5f11eef4e36d489a8b...,trade,2049,39.2,2.3562,92.36304,577023702281970440,577305177208086788,0,...,0.0,0.0,True,0.0,0.0,True,1768213099123153,,,


In [58]:
def fetch_order_books() -> Any:
    r = requests.get(
        f"{BASE_URL}/api/v1/orderBooks",
        params={"market_id": 255, "filter": "all"},
        headers={"accept": "application/json"},
        timeout=30,
    )
    r.raise_for_status()
    return r.json()

def build_market_pair_map() -> Dict[int, str]:
    ob = fetch_order_books()

    # orderBooks 응답이 dict/list 둘 다 올 수 있어서 방어적으로 처리
    candidates = []
    if isinstance(ob, dict):
        if isinstance(ob.get("order_books"), list):
            candidates = ob["order_books"]
        elif isinstance(ob.get("data"), list):
            candidates = ob["data"]
        else:
            candidates = [ob]
    elif isinstance(ob, list):
        candidates = ob

    pair_map: Dict[int, str] = {}

    for it in candidates:
        if not isinstance(it, dict):
            continue

        mid = it.get("market_id") or it.get("marketId") or it.get("m") or it.get("id")
        try:
            mid = int(mid)
        except Exception:
            continue

        # 페어 문자열 후보 키들 (가능한 걸 우선 사용)
        for k in ["symbol", "pair", "name", "market", "ticker"]:
            v = it.get(k)
            if isinstance(v, str) and len(v) >= 3:
                pair_map[mid] = v
                break

        # 못 찾으면 base/quote로 조합 시도
        if mid not in pair_map:
            base = it.get("base_symbol") or it.get("base") or it.get("baseAsset") or it.get("base_asset")
            quote = it.get("quote_symbol") or it.get("quote") or it.get("quoteAsset") or it.get("quote_asset")
            if isinstance(base, str) and isinstance(quote, str):
                pair_map[mid] = f"{base}-{quote}"

    return pair_map

pair_map = build_market_pair_map()
pair_map.get(0), len(pair_map)


(None, 134)

In [59]:
def classify_side(row) -> str:
    # 내 계정이 bid_account_id면 매수, ask_account_id면 매도
    if row.get("bid_account_id") == ACCOUNT_INDEX:
        return "매수"
    if row.get("ask_account_id") == ACCOUNT_INDEX:
        return "매도"
    return "기타"

def base_currency_from_pair(pair: str) -> str:
    # "ETH-USDC" -> "ETH"
    if isinstance(pair, str) and "-" in pair:
        return pair.split("-")[0]
    return ""

out = df_trades.copy()

# 1) datetime (KST)
out["일시"] = pd.to_datetime(out["timestamp"], unit="ms", utc=True).dt.tz_convert("Asia/Seoul")
out["일시"] = out["일시"].dt.strftime("%Y-%m-%d-%H-%M-%S")

# 2) 거래소
out["거래소"] = "Lighter"

# 3) 유형(매수/매도)
out["유형"] = out.apply(classify_side, axis=1)

# 4) 페어 (market_id -> pair)
out["페어"] = out["market_id"].map(pair_map).fillnaconfirm?
# NOTE: fillna fallback
out["페어"] = (
    out["market_id"]
    .map(pair_map)
    .fillna(out["market_id"].apply(lambda x: f"market_{x}"))
)

# 5) 통화 (base asset)
out["통화"] = out["페어"].apply(base_currency_from_pair)

# 6) 가격 (float)
out["가격"] = out["price"].astype(float)

# 7) 원화가치 = usd_amount * 환율
out["usd_amount"] = out["usd_amount"].astype(float)
out["원화가치"] = out["usd_amount"] * float(FX_KRW_PER_USD)

# 8) 적용환율
out["적용환율"] = float(FX_KRW_PER_USD)

# 최종 컬럼만
final_df = out[["일시", "거래소", "유형", "페어", "통화", "가격", "원화가치", "적용환율"]].copy()

final_df


Object `fillnaconfirm` not found.


Unnamed: 0,일시,거래소,유형,페어,통화,가격,원화가치,적용환율
0,2026-01-12-19-18-19,Lighter,매도,LIT/USDC,,2.3562,3.026763e+06,1300.0
1,2026-01-12-19-18-19,Lighter,매도,LIT/USDC,,2.3562,2.634569e+06,1300.0
2,2026-01-12-19-18-19,Lighter,매도,LIT/USDC,,2.3562,2.169198e+06,1300.0
3,2026-01-12-19-18-19,Lighter,매도,LIT/USDC,,2.3562,3.810048e+06,1300.0
4,2026-01-12-19-18-19,Lighter,매도,LIT/USDC,,2.3562,1.200720e+05,1300.0
...,...,...,...,...,...,...,...,...
995,2025-05-24-10-27-06,Lighter,매도,BNB,,662.2797,9.987178e+05,1300.0
996,2025-05-23-20-51-58,Lighter,매도,BTC,,109459.2000,2.808942e+06,1300.0
997,2025-05-23-20-49-55,Lighter,매수,BTC,,109769.9000,2.816915e+06,1300.0
998,2025-05-23-20-49-21,Lighter,매수,TRUMP,,13.3860,2.697279e+05,1300.0


In [69]:
import requests
import pandas as pd
from typing import Optional, Dict, Any, List

# ====== 설정 ======
RO_TOKEN = "ro:6183:single:1778333113:5ad34daa8a140e08dd8d1ee3f07bc7d20c305975122acbb8fe682cd659d9b594"
BASE_URL = "https://mainnet.zklighter.elliot.ai"
ACCOUNT_INDEX = 6183
FX_KRW_PER_USD = 1300.0
L1_ADDRESS = '0x6DbAF85a8Bb5753EB39CD5c4653BEE164f211987'

# SESSION = requests.Session()
# SESSION.headers.update({"accept": "application/json", "authorization": RO_TOKEN})

SESSION = requests.Session()
SESSION.headers.update({"accept": "application/json"})  # ✅ authorization 제거


# ====== 유틸 ======
def _get(url: str, params: Dict[str, Any], timeout: int = 30) -> Dict[str, Any]:
    # ✅ history류는 auth를 params로 받는 경우가 많아서 항상 넣어줌(있으면 덮어쓰지 않음)
    params = dict(params or {})
    params.setdefault("auth", RO_TOKEN)

    r = SESSION.get(url, params=params, timeout=timeout)
    r.raise_for_status()
    return r.json()

def _extract_next_cursor(resp: Dict[str, Any]) -> Optional[str]:
    # trades에서 확인된 키: next_cursor
    for k in ["next_cursor", "nextCursor"]:
        v = resp.get(k)
        if isinstance(v, str) and v:
            return v
    # 일부 endpoint는 cursor를 그대로 next로 주기도 함
    v = resp.get("cursor")
    if isinstance(v, str) and v:
        return v
    if isinstance(v, dict):
        for kk in ["next", "next_cursor", "nextCursor"]:
            vv = v.get(kk)
            if isinstance(vv, str) and vv:
                return vv
    return None

def _to_dt_kst_from_ms(ts_ms: Any) -> Optional[pd.Timestamp]:
    try:
        return pd.to_datetime(ts_ms, unit="ms", utc=True).tz_convert("Asia/Seoul")
    except Exception:
        return None

def _safe_float(x: Any) -> Optional[float]:
    try:
        if x is None:
            return None
        return float(x)
    except Exception:
        return None


# ====== 1) trades (trade + liquidation) ======
def fetch_trades_page(limit=100, cursor: Optional[str] = None, type_: str = "all") -> Dict[str, Any]:
    params = {
        "sort_by": "timestamp",
        "sort_dir": "desc",
        "limit": limit,
        "account_index": ACCOUNT_INDEX,
        "market_id": 255,
        "type": type_,   # ✅ all로 가져오면 trade+liquidation 같이 옴
        "role": "all",
    }
    if cursor:
        params["cursor"] = cursor
    return _get(f"{BASE_URL}/api/v1/trades", params)

def fetch_trades(max_pages=50, limit=100, type_: str = "all") -> pd.DataFrame:
    rows: List[Dict[str, Any]] = []
    cursor = None
    for _ in range(max_pages):
        resp = fetch_trades_page(limit=limit, cursor=cursor, type_=type_)
        trades = resp.get("trades", [])
        if not isinstance(trades, list) or not trades:
            break
        rows.extend(trades)
        cursor = _extract_next_cursor(resp)
        if not cursor:
            break
    return pd.DataFrame(rows)

def fetch_order_books() -> Any:
    return _get(
        f"{BASE_URL}/api/v1/orderBooks",
        params={"market_id": 255, "filter": "all"},
    )

def build_market_pair_map() -> Dict[int, str]:
    ob = fetch_order_books()
    candidates = []
    if isinstance(ob, dict):
        if isinstance(ob.get("order_books"), list):
            candidates = ob["order_books"]
        elif isinstance(ob.get("data"), list):
            candidates = ob["data"]
        else:
            candidates = [ob]
    elif isinstance(ob, list):
        candidates = ob

    pair_map: Dict[int, str] = {}
    for it in candidates:
        if not isinstance(it, dict):
            continue
        mid = it.get("market_id") or it.get("marketId") or it.get("m") or it.get("id")
        try:
            mid = int(mid)
        except Exception:
            continue

        # 가능한 키 후보
        for k in ["symbol", "pair", "name", "market", "ticker"]:
            v = it.get(k)
            if isinstance(v, str) and v:
                pair_map[mid] = v
                break

        # base/quote 조합 시도
        if mid not in pair_map:
            base = it.get("base_symbol") or it.get("base") or it.get("baseAsset") or it.get("base_asset")
            quote = it.get("quote_symbol") or it.get("quote") or it.get("quoteAsset") or it.get("quote_asset")
            if isinstance(base, str) and isinstance(quote, str) and base and quote:
                pair_map[mid] = f"{base}-{quote}"

    return pair_map

pair_map = build_market_pair_map()

def base_currency_from_pair(pair: str) -> str:
    if isinstance(pair, str) and "-" in pair:
        return pair.split("-")[0]
    return ""

def classify_trade_row(row: pd.Series) -> str:
    # liquidation은 별도 라벨
    if row.get("type") == "liquidation":
        return "청산"
    # 일반 trade는 bid/ask account로 매수/매도
    if row.get("bid_account_id") == ACCOUNT_INDEX:
        return "매수"
    if row.get("ask_account_id") == ACCOUNT_INDEX:
        return "매도"
    return "기타"

def trades_to_final_df(df: pd.DataFrame) -> pd.DataFrame:
    if df.empty:
        return pd.DataFrame(columns=["일시","거래소","유형","페어","통화","가격","원화가치","적용환율","_sort_ts"])

    out = df.copy()

    # timestamp(ms) -> datetime(KST)
    out["_dt"] = out["timestamp"].apply(_to_dt_kst_from_ms)
    out["_sort_ts"] = out["_dt"].astype("datetime64[ns, Asia/Seoul]")

    out["일시"] = out["_dt"].dt.strftime("%Y-%m-%d-%H-%M-%S")
    out["거래소"] = "Lighter"
    out["유형"] = out.apply(classify_trade_row, axis=1)

    out["페어"] = (
        out["market_id"]
        .map(pair_map)
        .fillna(out["market_id"].apply(lambda x: f"market_{x}"))
    )
    out["통화"] = out["페어"].apply(base_currency_from_pair)

    out["가격"] = out["price"].apply(_safe_float)
    out["usd_amount"] = out["usd_amount"].apply(_safe_float)

    out["적용환율"] = float(FX_KRW_PER_USD)
    out["원화가치"] = out["usd_amount"].fillna(0.0) * out["적용환율"]

    final_df = out[["일시","거래소","유형","페어","통화","가격","원화가치","적용환율","_sort_ts"]].copy()
    return final_df


# ====== 2) 입출금/이체 history (가능한 범위) ======
def try_get_l1_address_from_account() -> Optional[str]:
    """
    deposit/history가 l1_address를 요구해서,
    account?by=index로 l1_address를 얻을 수 있는지 시도.
    (응답 키는 환경마다 달라서 후보 키들을 탐색)
    """
    try:
        resp = _get(
            f"{BASE_URL}/api/v1/account",
            params={"by": "index", "value": str(ACCOUNT_INDEX)},
        )
    except Exception:
        return None

    # 가능한 키 후보들
    candidates = []
    if isinstance(resp, dict):
        candidates.append(resp)
        if isinstance(resp.get("data"), dict):
            candidates.append(resp["data"])
        if isinstance(resp.get("account"), dict):
            candidates.append(resp["account"])

    for obj in candidates:
        if not isinstance(obj, dict):
            continue
        for k in ["l1_address","l1Address","owner","owner_address","eth_address","ethAddress","address"]:
            v = obj.get(k)
            if isinstance(v, str) and v.startswith("0x") and len(v) >= 10:
                return v
    return None

def fetch_transfer_history(max_pages=50, cursor: Optional[str]=None) -> pd.DataFrame:
    rows = []
    cur = cursor
    for _ in range(max_pages):
        resp = _get(
            f"{BASE_URL}/api/v1/transfer/history",
            params={"account_index": ACCOUNT_INDEX, "cursor": cur} if cur else {"account_index": ACCOUNT_INDEX},
        )
        items = resp.get("transfers") or resp.get("data") or resp.get("items") or resp.get("results") or []
        if not isinstance(items, list) or not items:
            break
        rows.extend(items)
        nxt = _extract_next_cursor(resp)
        if not nxt or nxt == cur:
            break
        cur = nxt
    return pd.DataFrame(rows)

def fetch_withdraw_history(max_pages=50, cursor: Optional[str]=None, filter_: str="all") -> pd.DataFrame:
    rows = []
    cur = cursor
    for _ in range(max_pages):
        params = {"account_index": ACCOUNT_INDEX, "filter": filter_}
        if cur:
            params["cursor"] = cur
        resp = _get(f"{BASE_URL}/api/v1/withdraw/history", params=params)
        items = resp.get("withdraws") or resp.get("withdrawals") or resp.get("data") or resp.get("items") or resp.get("results") or []
        if not isinstance(items, list) or not items:
            break
        rows.extend(items)
        nxt = _extract_next_cursor(resp)
        if not nxt or nxt == cur:
            break
        cur = nxt
    return pd.DataFrame(rows)

def fetch_deposit_history(l1_address: str, max_pages=50, cursor: Optional[str]=None, filter_: str="all") -> pd.DataFrame:
    rows = []
    cur = cursor
    for _ in range(max_pages):
        params = {"account_index": ACCOUNT_INDEX, "l1_address": l1_address, "filter": filter_}
        if cur:
            params["cursor"] = cur
        resp = _get(f"{BASE_URL}/api/v1/deposit/history", params=params)
        items = resp.get("deposits") or resp.get("data") or resp.get("items") or resp.get("results") or []
        if not isinstance(items, list) or not items:
            break
        rows.extend(items)
        nxt = _extract_next_cursor(resp)
        if not nxt or nxt == cur:
            break
        cur = nxt
    return pd.DataFrame(rows)

def history_to_events_df(df: pd.DataFrame, event_type_kr: str) -> pd.DataFrame:
    """
    응답 구조가 확정이 아니라서 후보 키를 넓게 잡고,
    final_df 스키마(일시/거래소/유형/페어/통화/가격/원화가치/적용환율)에 맞춰 최소한으로 정규화.
    """
    if df.empty:
        return pd.DataFrame(columns=["일시","거래소","유형","페어","통화","가격","원화가치","적용환율","_sort_ts"])

    x = df.copy()

    # 시간 후보 키
    ts_col = None
    for k in ["timestamp","time","created_at","createdAt","block_timestamp","tx_time","transaction_time"]:
        if k in x.columns:
            ts_col = k
            break

    if ts_col is not None:
        # ms인지 sec인지 모르면 보통 ms가 많아서 우선 ms 시도 후 실패 시 sec
        def to_dt(v):
            dt = _to_dt_kst_from_ms(v)
            if dt is not None:
                return dt
            try:
                return pd.to_datetime(v, unit="s", utc=True).tz_convert("Asia/Seoul")
            except Exception:
                return None
        x["_dt"] = x[ts_col].apply(to_dt)
    else:
        x["_dt"] = None

    x["_sort_ts"] = x["_dt"].astype("datetime64[ns, Asia/Seoul]")
    x["일시"] = x["_dt"].dt.strftime("%Y-%m-%d-%H-%M-%S")

    # 자산/금액 후보 키
    asset_col = None
    for k in ["asset","asset_symbol","token","symbol","currency","coin"]:
        if k in x.columns:
            asset_col = k
            break
    amt_col = None
    for k in ["usd_amount","amount_usd","amountUsd","amount","amt","value","size","quantity","qty"]:
        if k in x.columns:
            amt_col = k
            break

    # 최종 스키마 채우기
    x["거래소"] = "Lighter"
    x["유형"] = event_type_kr
    x["페어"] = ""  # 입출금/이체는 페어 개념이 없으니 공백
    x["통화"] = x[asset_col] if asset_col else ""
    x["가격"] = None  # 입출금/이체는 체결가 개념 없음

    usd = x[amt_col].apply(_safe_float) if amt_col else None
    x["적용환율"] = float(FX_KRW_PER_USD)
    x["원화가치"] = (usd.fillna(0.0) if usd is not None else 0.0) * x["적용환율"]

    return x[["일시","거래소","유형","페어","통화","가격","원화가치","적용환율","_sort_ts"]].copy()


# ====== 실행: trade+liquidation + 입출금/이체 합치기 ======
df_trades_all = fetch_trades(max_pages=50, limit=100, type_="all")
final_trades_df = trades_to_final_df(df_trades_all)

# withdraw / transfer는 l1_address 없이도 시도
df_withdraw = fetch_withdraw_history(max_pages=50)
df_transfer = fetch_transfer_history(max_pages=50)

events_withdraw = history_to_events_df(df_withdraw, "출금")
events_transfer = history_to_events_df(df_transfer, "이체")

# deposit은 l1_address가 필요해서 자동 추출 시도
l1_addr = L1_ADDRESS or try_get_l1_address_from_account()
if l1_addr:
    df_deposit = fetch_deposit_history(l1_address=l1_addr, max_pages=50)
    events_deposit = history_to_events_df(df_deposit, "입금")
else:
    events_deposit = pd.DataFrame(columns=events_withdraw.columns)

timeline_df = pd.concat(
    [final_trades_df, events_deposit, events_withdraw, events_transfer],
    ignore_index=True
)

# 시간순 정렬 (datetime이 없는 row는 뒤로)
timeline_df = timeline_df.sort_values(by=["_sort_ts"], na_position="last").reset_index(drop=True)

# 보기용 컬럼만
timeline_view = timeline_df.drop(columns=["_sort_ts"])
timeline_view.to_csv("test_lighter_sj_v0.3.csv")
# timeline_view


In [64]:
timeline_view.tail(10)

Unnamed: 0,일시,거래소,유형,페어,통화,가격,원화가치,적용환율
1576,2026-01-12-19-18-17,Lighter,매도,LIT/USDC,,2.3562,3583.78,1300.0
1577,2026-01-12-19-18-17,Lighter,매도,LIT/USDC,,2.3562,649981.3,1300.0
1578,2026-01-12-19-18-18,Lighter,매도,LIT/USDC,,2.3562,1225224.0,1300.0
1579,2026-01-12-19-18-19,Lighter,매도,LIT/USDC,,2.3562,120072.0,1300.0
1580,2026-01-12-19-18-19,Lighter,매도,LIT/USDC,,2.3562,2634569.0,1300.0
1581,2026-01-12-19-18-19,Lighter,매도,LIT/USDC,,2.3562,2169198.0,1300.0
1582,2026-01-12-19-18-19,Lighter,매도,LIT/USDC,,2.3562,3810048.0,1300.0
1583,2026-01-12-19-18-19,Lighter,매도,LIT/USDC,,2.3562,3026763.0,1300.0
1584,2026-01-12-19-22-41,Lighter,이체,,,,3900000.0,1300.0
1585,2026-01-12-19-23-01,Lighter,이체,,,,19795890.0,1300.0


In [68]:
print("입금 rows:", len(events_deposit), "출금 rows:", len(events_withdraw), "이체 rows:", len(events_transfer))
timeline_view.head(30)


입금 rows: 47 출금 rows: 0 이체 rows: 9


Unnamed: 0,일시,거래소,유형,페어,통화,가격,원화가치,적용환율
0,2025-04-16-16-59-20,Lighter,입금,,,,52000.0,1300.0
1,2025-04-16-16-59-42,Lighter,매도,market_0,,1563.21,1025434.0,1300.0
2,2025-04-16-17-00-35,Lighter,매수,market_0,,1562.75,1025133.0,1300.0
3,2025-04-16-17-00-47,Lighter,매도,KAITO,,0.69881,257909.8,1300.0
4,2025-04-16-17-03-42,Lighter,매수,KAITO,,0.69677,257156.9,1300.0
5,2025-04-16-17-04-03,Lighter,매도,ENA,,0.26926,520436.5,1300.0
6,2025-04-16-17-06-26,Lighter,매수,ENA,,0.27037,522582.0,1300.0
7,2025-04-16-17-06-54,Lighter,매도,TRUMP,,7.8779,250706.3,1300.0
8,2025-04-16-17-13-51,Lighter,매수,TRUMP,,7.8161,248739.6,1300.0
9,2025-04-16-17-14-27,Lighter,매수,TRUMP,,7.8178,261497.6,1300.0


In [73]:
!curl http://ipv4bot.whatismyipaddress.com/

curl: (6) Could not resolve host: ipv4bot.whatismyipaddress.com
