In [2]:
import nest_asyncio
nest_asyncio.apply()

In [3]:
from abc import abstractmethod

import pandas as pd
import numpy as np
import requests
import asyncio
import aiohttp
import traceback
from utils import TimeManager


# https://api.mexc.com/api/v3/klines?symbol=FORTHUSDT&interval=60m&limit=3
# https://api.bybit.com/v5/market/kline?symbol=WBTCBTC&interval=60&category=spot&limit=3
# https://api-aws.huobi.pro/market/history/kline?period=4hour&size=3&symbol=btcusdt
# https://api.kucoin.com/api/v1/market/candles?symbol=BTC-USDT&type=4hour
# https://api.bitget.com/api/v2/spot/market/candles?symbol=BTCUSDT&granularity=4h&limit=3
# https://api.lbkex.com/v2/kline.do?size=3&type=hour4&time=1705015573&symbol=bch_usdt
# logic: descending order an discard the first data


class BaseExchange:
    def __init__(self, name, base_url, info_endpoint, klines_endpoint, interval, limit_per_second):
        self.base_url = base_url
        self.name = name
        self.info_endpoint = info_endpoint
        self.klines_endpoint = klines_endpoint
        self.interval = interval
        self.max_request_per_second = limit_per_second

    @abstractmethod
    def get_exchange_info(self, filters: list = None):
        pass

    @abstractmethod
    async def fetch_klines_by_symbol(self, symbol, st, et):
        pass


class BinanceExchange(BaseExchange):
    def __init__(self):
        super().__init__(
            name="BINANCE",
            base_url="https://fapi.binance.com",
            info_endpoint="/fapi/v1/exchangeInfo",
            klines_endpoint="/fapi/v1/continuousKlines",  # ascending
            interval="1h",
            limit_per_second=int(20000 / 60 / 5),
        )
        self.info_url = self.base_url + self.info_endpoint
        self.klines_url = self.base_url + self.klines_endpoint
        self.interval_in_ms = TimeManager.interval_str_to_ms(self.interval)

    def get_exchange_info(self, filters: list = None):
        response = requests.get(self.info_url)
        data = response.json()
        """
        data = [
            {
                "symbol": each["symbol"],
                "contractType": each["contractType"],
                "status": each["status"],
                "tickSize": each["filters"][0]["tickSize"],
                "notional": each["filters"][5]["notional"]
            } for each in response.json()["symbols"]
        ]
        """
        return data

    async def fetch_klines_by_symbol(self, symbol, st, et):

        st_in_ms = TimeManager.dt_str_to_ms(st, format="%Y-%m-%d %H:%M:%S")
        et_in_ms = TimeManager.dt_str_to_ms(et, format="%Y-%m-%d %H:%M:%S")
        data_lis = []
        async with aiohttp.ClientSession() as session:
            while True:
                params = {
                    "pair": symbol,
                    "contractType": "PERPETUAL",
                    "interval": self.interval,
                    "startTime": st_in_ms,
                    "endTime": et_in_ms,
                    "limit": 1000,
                }

                try:
                    async with session.get(self.klines_url, params=params) as response:
                        data = await response.json()
                        # Check if the response is rate limited
                        if response.status != 200:
                            wait_time = 2
                            print(
                                f"Error Status: {response.status}, Error Msg: {data['msg']}."
                            )
                            await asyncio.sleep(wait_time)

                        if len(data) > 0:
                            st_in_ms = data[-1][0] + self.interval_in_ms
                            data_lis.extend(data)
                        else:
                            break

                        if st_in_ms >= et_in_ms:
                            break

                except Exception as e:
                    print(f"Error processing binance data {e}")
                    traceback.print_exc()

        return data_lis

async def get_binance_kline(symbol_lis: list, st: str, et: str) -> pd.DataFrame:
    binance = BinanceExchange()
    tasks = [
        binance.fetch_klines_by_symbol(symbol=symbol, st=st, et=et)
        for symbol in symbol_lis
    ]
    results = await asyncio.gather(*tasks)

    klines = {}
    for i, res in enumerate(results):
        kl_df = pd.DataFrame(res, columns=["ots", "open", "high", "low", "close", "volume", "ts",
                                                 "usd_v", "n_trades", "taker_buy_v", "taker_buy_usd", "ig"]).drop(["ots", "ig"], axis=1)
        kl_df['ts'] = kl_df['ts'] + 1
        kl_df['ts'] = kl_df['ts'].apply(TimeManager.ms_to_timestamp)
        kl_df.set_index("ts", inplace=True)
        klines[symbol_lis[i]] = kl_df

    return klines

def aggregate_multi_symbols_kline(klines: dict) -> pd.DataFrame:

    btc = klines["BTCUSDT"]
    st = btc.index[0]
    et = btc.index[-1]

    ts_lis = btc.index.tolist()
    symbol_lis = list(klines.keys())
    col = btc.columns.tolist() + ['vwap']

    multi_idx = pd.MultiIndex.from_product([ts_lis, symbol_lis], names=["ts", "symbol"])
    data = np.full(shape=(len(ts_lis), len(symbol_lis), len(col)), fill_value=np.nan)

    wrong_symbol = []
    for symbol, kline in klines.items():
        indice = symbol_lis.index(symbol)
        try:
            kline = kline.loc[(kline.index >= st) & (kline.index <= et)]
            kline = kline.apply(pd.to_numeric)
            kline['vwap'] = (kline['usd_v'] / kline['volume']).ffill().bfill()  # volume可能为0而导致nan，填充即可
            st_idx = ts_lis.index(kline.index[0])
            ed_idx = ts_lis.index(kline.index[-1])
            data[st_idx:ed_idx + 1, indice, :] = kline.values
        except:
            wrong_symbol.append(symbol)

    df = pd.DataFrame(data.reshape(-1, data.shape[-1]), index=multi_idx, columns=col)
    symbol_lis = list(set(symbol_lis).difference(set(wrong_symbol)))
    df = df.loc[(slice(None), symbol_lis), :]
    return df

def get_binance_symbol_info() -> pd.DataFrame:
    binance = BinanceExchange()
    info = binance.get_exchange_info()
    data = [
        {
            "symbol": each["symbol"],
            "contractType": each["contractType"],
            "status": each["status"],
            "tickSize": each["filters"][0]["tickSize"],
            "notional": each["filters"][5]["notional"]
        } for each in info["symbols"]
    ]
    return pd.DataFrame(data)

if __name__ == "__main__":

    info = get_binance_symbol_info()
    klines = asyncio.run(get_binance_kline(["BTCUSDT", "ETHUSDT"], st="2023-01-01 00:00:00", et="2024-01-01 00:00:00"))
    aggregated_kline = aggregate_multi_symbols_kline(klines)

    print(aggregated_kline, info)

                                       open      high       low     close  \
ts                        symbol                                            
2022-12-31 17:00:00+00:00 BTCUSDT  16586.10  16609.30  16577.30  16596.00   
                          ETHUSDT   1202.59   1203.90   1200.66   1202.49   
2022-12-31 18:00:00+00:00 BTCUSDT  16596.10  16600.10  16565.90  16573.40   
                          ETHUSDT   1202.50   1202.82   1198.35   1200.59   
2022-12-31 19:00:00+00:00 BTCUSDT  16573.40  16582.80  16566.20  16571.70   
...                                     ...       ...       ...       ...   
2023-12-31 15:00:00+00:00 ETHUSDT   2304.61   2306.65   2300.23   2302.51   
2023-12-31 16:00:00+00:00 BTCUSDT  42565.00  42628.00  42383.80  42489.60   
                          ETHUSDT   2302.52   2310.65   2291.86   2300.60   
2023-12-31 17:00:00+00:00 BTCUSDT  42489.60  42741.60  42465.40  42630.00   
                          ETHUSDT   2300.59   2311.74   2299.01   2306.35   

In [4]:
info

Unnamed: 0,symbol,contractType,status,tickSize,notional
0,BTCUSDT,PERPETUAL,TRADING,0.10,100
1,ETHUSDT,PERPETUAL,TRADING,0.01,20
2,BCHUSDT,PERPETUAL,TRADING,0.01,20
3,XRPUSDT,PERPETUAL,TRADING,0.0001,5
4,EOSUSDT,PERPETUAL,TRADING,0.001,5
...,...,...,...,...,...
283,PORTALUSDT,PERPETUAL,TRADING,0.0001000,5
284,TONUSDT,PERPETUAL,TRADING,0.0001000,5
285,AXLUSDT,PERPETUAL,TRADING,0.0001000,5
286,MYROUSDT,PERPETUAL,TRADING,0.0000100,5
