In [1]:
import asyncio
import datetime
import json
import pandas as pd
import numpy as np
import websockets
import pyarrow as pa
import pyarrow.parquet as pq
import logging
from asyncio import CancelledError, TimeoutError
import threading


logging.basicConfig(format='%(asctime)s %(message)s', level=logging.DEBUG)

url = "wss://ws.okx.com:8443/ws/v5/public"
url_block = 'wss://wsaws.okx.com:8443/ws/v5/business'

df_=pd.DataFrame()
total_table = pd.DataFrame()
total_table_block = pd.DataFrame()

subs = dict(
    op='subscribe',
    args=(
        dict(channel='liquidation-orders', instType='SWAP'),
        dict(channel='trades', instId='BTC-USDT-SWAP'),
        dict(channel='books5', instId='BTC-USDT-SWAP'),
        dict(channel='open-interest', instId='BTC-USDT-SWAP'),

    )
)
subs_block = dict(
    op='subscribe',
    args=(
        dict(channel='block-tickers', instId='BTC-USDT-SWAP'),

    )
)

async def send_subscribe_msg(ws,subs,total_table):
    await ws.send(json.dumps(subs))
    async for msg in ws:
        msg = json.loads(msg)
        ev = msg.get('event')
        arg = msg.get('arg')
        data = msg.get('data')
        channel = arg.get('channel')
        if ev:
            i = msg.get('arg')
            print(f'****************** event {ev} = {i} ******************')

        elif channel == 'trades':
            instId=data[0].get('instId')
            tradeId=data[0].get('tradeId')
            px=data[0].get('px')
            sz=data[0].get('sz')
            side=data[0].get('side')
            ts=data[0].get('ts')
            count=data[0].get('count')
            lst = [channel,instId,tradeId,px,sz,side,ts,count]
            columns = ['channel','instId','tradeId','px','sz','side','ts','count']
            df_ = pd.DataFrame(lst,columns).T
        
        elif channel == 'books5':
            asks = data[0].get('asks')
            bids = data[0].get('bids')
            instId = data[0].get('instId')
            ts = data[0].get('ts')
            seqId = data[0].get('seqId')
            asks = list(np.array(asks).ravel())
            bids = list(np.array(bids).ravel())

            lst=[channel,instId,ts,seqId]
            lst.extend(asks)
            lst.extend(bids)
            columns = ['channel','instId','ts','seqId','ask_price1','ask_size1','na1','ask_counts1','ask_price2','ask_size2','na2','ask_counts2','ask_price3','ask_size3','na3','ask_counts3','ask_price4','ask_size4','na4','ask_counts4','ask_price5','ask_size5','na5','ask_counts5','bid_price1','bid_size1','nal1','bid_counts1','bid_price2','bid_size2','nal2','bid_counts2','bid_price3','bidk_size3','nal3','bid_counts3','bid_price4','bid_size4','nal4','bid_counts4','bid_price5','bid_size5','nal5','bid_counts5']

            df_ = pd.DataFrame(lst,columns).T

        
        elif channel == 'open-interest':
            instId = data[0].get('instId')
            instType = data[0].get('instType')
            oi = data[0].get('oi')
            oiCcy = data[0].get('oiCcy')
            ts = data[0].get('ts')
            lst = [channel,instId,instType,oi,oiCcy,ts]
            columns = ['channel','instId','instType','oi','oiCcy','ts']
            df_ = pd.DataFrame(lst,columns).T

        elif channel=='liquidation-orders':

            instType = data[0].get('instType')
            instId = data[0].get('instId')
            uly = data[0].get('uly')
            details = data[0].get('details')
            details = details[0]
            side = details.get('side')
            posSide = details.get('posSide')
            bkPx = details.get('bkPx')
            sz = details.get('sz')
            bkLoss = details.get('bkLoss')
            ccy = details.get('ccy')
            ts = details.get('ts')

            if instId =='BTC-USDT-SWAP'or instId == 'ETH-USDT-SWAP':
                lst = [channel,instId, side, posSide, bkPx, sz, bkLoss, ccy, ts]
                columns = ['channel','instId', 'side', 'posSide', 'bkPx', 'sz', 'bkLoss', 'ccy', 'ts']
                df_ = pd.DataFrame(lst,columns).T
        elif channel=='block-tickers':
            instType = data[0].get("instType")
            instId = data[0].get("instId")
            volCcy24h = data[0].get("volCcy24h")
            vol24h = data[0].get("vol24h")
            ts = data[0].get("ts")
            lst = [channel,instId,instType,volCcy24h,vol24h,ts]
            columns = ['channel','instId','instType','volCcy24h','vol24h,','ts']
            df_ = pd.DataFrame(lst,columns).T
        if data :
            total_table= await table_concat(total_table, df_)
            print(total_table.shape)
        if total_table.shape[0]>= 120_000:
            total_table = await write_table_parquet(total_table)
            
async def table_concat(total_table, df_):
    return pd.concat([total_table, df_], axis=0,ignore_index=True)

async def write_table_parquet(total_table):
    table = pa.Table.from_pandas(total_table)
    pq.write_table(table, f'train{datetime.datetime.now()}.parquet')
    return pd.DataFrame()

async def base_ws():
    try:
        async with websockets.connect(url, ping_interval=15,ping_timeout=60) as ws:
            await asyncio.create_task(send_subscribe_msg(ws,subs,total_table))
    except CancelledError as e:
        print('Already cancelled the client...')
    
    except ConnectionClosedError as e:
        print(e)
    await asyncio.sleep(1)
    await base_ws()
    
async def block_ws():
    try:
        async with websockets.connect(url_block, ping_interval=10,ping_timeout=360) as ws:
            await asyncio.create_task(send_subscribe_msg(ws,subs_block,total_table_block))
    except CancelledError as e:
        print('Already cancelled the client...')
    except TimeoutError as e:
        print(e)
    await asyncio.sleep(1)
    await block_ws()
    
# 启动WebSocket服务器的函数
def run_websocket_servers():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(base_ws())
    loop.run_until_complete(block_ws())
    loop.run_forever()
    
# 启动多个WebSocket服务器线程
def main():
    thread = threading.Thread(target=run_websocket_servers)
    thread.start()

if __name__ == "__main__":
    main()

(5, 47)
(6, 47)
(7, 47)
(8, 52)
(9, 52)
(10, 52)
(11, 52)
(12, 52)
