In [13]:
import os
import json
import asyncio
from datetime import datetime

import pytz
from termcolor import cprint
from websockets import connect  # pip install websockets

symbols = ["btcusdt", "ethusdt", "solusdt", "bnbusdt", "dogeusdt", "wifusdt"]
websocket_url_base = "wss://fstream.binance.com/ws/"
trades_filename = "binance_trades.csv"

# Check if the CSV file exists, if not create it with header
if not os.path.isfile(trades_filename):
    with open(trades_filename, "w") as f:
        f.write(
            "Event Time,Symbol,Aggregate Trade ID,Price,Quantity,Trade Time,Is Buyer Maker\n"
        )


class TradeAggregator:
    def __init__(self):  # <- fixed: __init__ (double underscore)
        self.trade_buckets = {}

    async def add_trade(self, symbol, second, usd_size, is_buyer_maker):
        """
        Aggregate trades into 1-second buckets keyed by (symbol, second, side).
        """
        trade_key = (symbol, second, is_buyer_maker)
        self.trade_buckets[trade_key] = self.trade_buckets.get(trade_key, 0) + usd_size

    async def check_and_print_trades(self):
        """
        Once the current time passes a bucket's second, print and delete it
        if the notional size is big enough.
        """
        timestamp_now = datetime.utcnow().strftime("%H:%M:%S")
        deletions = []

        for trade_key, usd_size in self.trade_buckets.items():
            symbol, second, is_buyer_maker = trade_key  # <- unpack key

            # only print buckets whose second has passed
            if second < timestamp_now and usd_size > 500000:
                attrs = ["bold"]
                back_color = "on_blue" if not is_buyer_maker else "on_magenta"
                trade_type = "BUY" if not is_buyer_maker else "SELL"

                # convert to millions for display
                size_m = usd_size / 1000000

                if usd_size > 3000000:
                    # very large trades - blinking
                    cprint(
                        f"\033[5m{trade_type} {symbol} {second} "
                        f"${size_m:.2f}m\033[0m",
                        "white",
                        back_color,
                        attrs=attrs,
                    )
                else:
                    cprint(
                        f"{trade_type} {symbol} {second} ${size_m:.2f}m",
                        "white",
                        back_color,  # <- fixed back_colour -> back_color
                        attrs=attrs,
                    )

                deletions.append(trade_key)

        # remove processed buckets
        for key in deletions:
            del self.trade_buckets[key]


trade_aggregator = TradeAggregator()


async def binance_trade_stream(uri, symbol, filename, aggregator: TradeAggregator):
    """
    Stream aggTrade data for a single symbol and feed into the aggregator.
    """
    async with connect(uri) as websocket:
        while True:
            try:
                message = await websocket.recv()
                data = json.loads(message)

                price = float(data["p"])
                qty = float(data["q"])
                usd_size = price * qty

                # trade time from Binance (ms)
                trade_dt = datetime.fromtimestamp(
                    data["T"] / 1000, pytz.timezone("US/Eastern")
                )
                readable_trade_time = trade_dt.strftime("%H:%M:%S")

                # m = is buyer the market maker (True -> sell, False -> buy)
                is_buyer_maker = data["m"]

                await aggregator.add_trade(
                    symbol.upper().replace("USDT", ""),
                    readable_trade_time,
                    usd_size,
                    is_buyer_maker,
                )

                # (optional) you can also log raw trades to CSV here if you want

            except Exception as e:
                cprint(f"Error in stream for {symbol}: {e}", "white", "on_red")
                await asyncio.sleep(5)


async def print_aggregated_trades_every_second(aggregator: TradeAggregator):
    """
    Once per second, flush any completed buckets from the aggregator.
    """
    while True:
        await asyncio.sleep(1)
        await aggregator.check_and_print_trades()


async def main():
    filename = "binance_trades_big.csv"

    # create one stream task per symbol
    trade_stream_tasks = [
        binance_trade_stream(
            f"{websocket_url_base}{symbol}@aggTrade", symbol, filename, trade_aggregator
        )
        for symbol in symbols
    ]

    print_task = asyncio.create_task(
        print_aggregated_trades_every_second(trade_aggregator)
    )

    await asyncio.gather(*trade_stream_tasks, print_task)


if __name__ == "__main__":
    asyncio.run(main())


RuntimeError: asyncio.run() cannot be called from a running event loop