In [None]:
import asyncio
import nest_asyncio
import os, sys
import datetime

import warnings
nest_asyncio.apply()

warnings.simplefilter("error", RuntimeWarning)  # 경고를 예외로 변환

home_path = os.path.expanduser("~")
sys.path.append(os.path.join(home_path, "GitHub", "Thunder", "Binance"))

import Workspace.Utils.TradingUtils as tr_utils# import 
from Workspace.Services.PublicData.Receiver.FuturesMarketWebsocket import (
    FuturesMarketWebsocket as futures_mk_ws,
)
import Workspace.Utils.BaseUtils as base_utils
import SystemConfig
# import Workspace.Utils.BaseUtils as base_utils

symbols = SystemConfig.Streaming.symbols
stream_1 = "depth"
stream_2 = "trade"

ins_st1 = futures_mk_ws(symbols)
ins_st2 = futures_mk_ws(symbols)
async def main(minute:int):
    print(f"  ⏳ make the program wait")
    await base_utils.sleep_next_minute(1)
    st1_result = []
    st2_result = []

    start_date = datetime.datetime.now()
    now_minute = int(start_date.minute)
    target_minute = int(start_date.minute) + minute
    await ins_st1.open_stream_connection(stream_1)
    print(f"  🔗 '{stream_1}' Websocket connect")
    await ins_st2.open_stream_connection(stream_2)
    print(f"  🔗 '{stream_2}' Websocket connect")
    i = 1
    print(f"  🚀 start receiving start")
    while now_minute < target_minute:
        base_utils.std_print(f"        🏃🏃🏻‍♀️🏃🏽‍♂️💨 {i}회 진행")
        st1_message = await ins_st1.receive_message()
        st2_message = await ins_st2.receive_message()
        i +=1
        symbol_st1 = st1_message["data"]["s"]
        symbol_st2 = st2_message["data"]["s"]
        if symbol_st1 == "BTCUSDT":
            st1_result.append(st1_message)
        if symbol_st2 == "BTCUSDT":
            st2_result.append(st2_message)
        start_date = datetime.datetime.now()
        now_minute = start_date.minute
    print(f"\n  ✅ 데이터 수신 완료")
    await ins_st1.close_connection()
    await ins_st2.close_connection()
    return st1_result, st2_result

message = asyncio.run(main(1))
depth_path = "/Users/hhh/Desktop/depth.json"
trade_path = "/Users/hhh/Desktop/trade.json"

base_utils.save_to_json(depth_path, message)
base_utils.save_to_json(trade_path, message)


In [1]:
import asyncio
import json
import pandas as pd
import websockets
import nest_asyncio

nest_asyncio.apply()

# Binance WebSocket URL
BINANCE_WS_URL = "wss://stream.binance.com:9443/ws/btcusdt@depth"

# 주문 데이터 저장 (최근 100개 유지)
depth_data = {
    "bids": pd.DataFrame(columns=["price", "quantity"]),
    "asks": pd.DataFrame(columns=["price", "quantity"]),
}

# 최대 저장 개수
MAX_HISTORY = 300

# 누적 델타 볼륨 (CDV)
cumulative_delta_volume = 0

async def process_depth_update(message):
    """ WebSocket으로 수신한 Depth 데이터 처리 및 분석 """
    global depth_data, cumulative_delta_volume

    data = json.loads(message)
    bids = data["b"]
    asks = data["a"]

    # DataFrame 변환
    bids_df = pd.DataFrame(bids, columns=["price", "quantity"]).astype(float)
    asks_df = pd.DataFrame(asks, columns=["price", "quantity"]).astype(float)

    # 0 이상인 데이터만 필터링
    bids_df = bids_df[bids_df["quantity"] > 0].sort_values(by="price", ascending=False)
    asks_df = asks_df[asks_df["quantity"] > 0].sort_values(by="price", ascending=True)

    # 이전 데이터와 비교하여 델타 볼륨 계산
    prev_bid_total = depth_data["bids"]["quantity"].sum() if not depth_data["bids"].empty else 0
    prev_ask_total = depth_data["asks"]["quantity"].sum() if not depth_data["asks"].empty else 0

    current_bid_total = bids_df["quantity"].sum()
    current_ask_total = asks_df["quantity"].sum()

    delta_bid = current_bid_total - prev_bid_total
    delta_ask = current_ask_total - prev_ask_total

    # 누적 델타 볼륨 업데이트
    cumulative_delta_volume += (delta_bid - delta_ask)

    # 최신 데이터 유지
    depth_data["bids"] = pd.concat([depth_data["bids"], bids_df]).drop_duplicates(subset=["price"]).nlargest(MAX_HISTORY, "price")
    depth_data["asks"] = pd.concat([depth_data["asks"], asks_df]).drop_duplicates(subset=["price"]).nsmallest(MAX_HISTORY, "price")

    # 분석 실행
    analyze_order_book()

def analyze_order_book():
    """ 호가 데이터 분석 """
    bids = depth_data["bids"]
    asks = depth_data["asks"]

    if bids.empty or asks.empty:
        return

    # 매수벽 & 매도벽 찾기
    max_bid = bids.nlargest(1, "quantity")
    max_ask = asks.nlargest(1, "quantity")

    # 스프레드 계산
    bid_price = bids["price"].max()
    ask_price = asks["price"].min()
    spread = ask_price - bid_price

    # 시장 불균형 (Order Book Imbalance) 계산
    total_bid_quantity = bids["quantity"].sum()
    total_ask_quantity = asks["quantity"].sum()
    imbalance = (total_bid_quantity - total_ask_quantity) / (total_bid_quantity + total_ask_quantity) * 100

    # 매수 강도 vs. 매도 강도 계산
    buy_pressure = (max_bid["quantity"].values[0] / total_bid_quantity) * 100
    sell_pressure = (max_ask["quantity"].values[0] / total_ask_quantity) * 100

    # 결과 출력
    print(f"📊 실시간 Depth 분석 📊")
    print(f"🟢 매수벽: 가격 {max_bid['price'].values[0]}, 수량 {max_bid['quantity'].values[0]}")
    print(f"🔴 매도벽: 가격 {max_ask['price'].values[0]}, 수량 {max_ask['quantity'].values[0]}")
    print(f"⚡ 스프레드: {spread:.2f}")
    print(f"📈 시장 불균형 (Imbalance): {imbalance:.2f}%")
    print(f"🔄 누적 델타 볼륨 (CDV): {cumulative_delta_volume:.2f}")
    print(f"📊 매수 강도 (Buy Pressure): {buy_pressure:.2f}% | 매도 강도 (Sell Pressure): {sell_pressure:.2f}%")
    print("=" * 50)

async def listen_binance_depth():
    """ Binance WebSocket을 수신하는 비동기 함수 """
    async with websockets.connect(BINANCE_WS_URL) as websocket:
        while True:
            try:
                message = await websocket.recv()
                await process_depth_update(message)
            except Exception as e:
                print(f"❌ WebSocket 오류 발생: {e}")
                break
            
asyncio.run(listen_binance_depth())


KeyboardInterrupt: 