In [12]:
import asyncio
import json
import websockets
import nest_asyncio
from collections import deque
from datetime import datetime
import threading
import time

nest_asyncio.apply()


class BinanceMarketData:
    def __init__(self, symbols, max_queue_size=10):
        """
        Initialize the Binance Market Data subscriber
        
        Args:
            symbols (list): List of trading symbols (e.g., ['BTCUSDT', 'ETHUSDT'])
            max_queue_size (int): Maximum size of price queue to prevent memory overflow
        """
        self.symbols = [symbol.lower() for symbol in symbols]  # Binance uses lowercase
        self.websocket_url = "wss://stream.binance.com:9443/ws/"
        self.latest_prices = {}  # Store latest price for each symbol
        self.price_queue = deque(maxlen=max_queue_size)  # Limited size queue
        self.is_running = False
        self.websocket = None
        
        # Initialize latest_prices dictionary
        for symbol in self.symbols:
            self.latest_prices[symbol] = {
                'price': None,
                'timestamp': None,
                'symbol': symbol.upper()
            }
    
    def create_stream_name(self):
        """
        Creates the stream name for multiple symbols
        Binance allows combining multiple streams in one connection
        
        Returns:
            str: Combined stream name for all symbols
        """
        # Create individual ticker streams for each symbol
        streams = [f"{symbol}@ticker" for symbol in self.symbols]
        return "/".join(streams)
    
    async def connect_websocket(self):
        """
        Establish WebSocket connection to Binance stream
        Uses ticker stream which provides 24hr price statistics including current price for all symbols provided
        """
        stream_name = self.create_stream_name()
        full_url = self.websocket_url + stream_name
        
        print(f"Connecting to: {full_url}")
        print(f"Subscribing to symbols: {[s.upper() for s in self.symbols]}")
        
        try:
            self.websocket = await websockets.connect(full_url)
            print("Connected to Binance WebSocket")
            return True
        except Exception as e:
            print(f"Failed to connect: {e}")
            return False

        
    def process_ticker_data(self, data):
        """
        Process incoming ticker data from Binance
        Args:
            data (dict): Raw ticker data from Binance WebSocket
        """
        try:
            # Extract relevant information from ticker data
            symbol = data.get('s', '').lower()  # Symbol (e.g., 'BTCUSDT')
            current_price = float(data.get('c', 0))  # Current price
            timestamp = data.get('E', 0)  # Event time
            
            # Convert timestamp to readable format
            readable_time = datetime.fromtimestamp(timestamp / 1000).strftime('%H:%M:%S.%f')[:-3]
            
            # Update latest prices dictionary
            if symbol in self.latest_prices:
                self.latest_prices[symbol] = {
                    'price': current_price,
                    'timestamp': readable_time,
                    'symbol': symbol.upper(),
                    'raw_timestamp': timestamp
                }
                
                # Add to queue (automatically removes oldest if queue is full)
                self.price_queue.append({
                    'symbol': symbol.upper(),
                    'price': current_price,
                    'time': readable_time
                })
                
                print(f"{symbol.upper()}: ${current_price:,.4f} at {readable_time}")
        
        except Exception as e:
            print(f"Error processing data: {e}")
    
    async def listen_to_stream(self):
        """
        Main listening loop for WebSocket data
        Continuously receives and processes market data
        """
        try:
            while self.is_running and self.websocket:
                # Receive message from WebSocket
                message = await self.websocket.recv()
                data = json.loads(message)
                
                # Process the ticker data
                self.process_ticker_data(data)
                
        except websockets.exceptions.ConnectionClosed:
            print("WebSocket connection closed")
        except Exception as e:
            print(f"Error in listen_to_stream: {e}")

    
    async def start_streaming(self):
        """
        Start the market data streaming process
        """
        self.is_running = True
        
        # Connect to WebSocket
        if await self.connect_websocket():
            print("📊 Starting market data stream...")
            print("=" * 50)
            
            # Start listening to the stream
            await self.listen_to_stream()
        else:
            print("❌ Could not establish connection")

    
    def stop_streaming(self):
        """
        Stop the streaming process
        """
        self.is_running = False
        if self.websocket:
            asyncio.create_task(self.websocket.close())
        print("🛑 Streaming stopped")
    
    def get_latest_prices(self):
        """
        Get the latest prices for all subscribed symbols
        
        Returns:
            dict: Dictionary with latest price data for each symbol
        """
        return self.latest_prices.copy()

    
    def get_price_history(self):
        """
        Get recent price history from the queue
        
        Returns:
            list: List of recent price updates
        """
        return list(self.price_queue)

    
    def print_summary(self):
        """
        Print a summary of current prices
        """
        print("\n" + "=" * 50)
        print("📊 CURRENT MARKET PRICES")
        print("=" * 50)
        
        for symbol, data in self.latest_prices.items():
            if data['price'] is not None:
                print(f"{data['symbol']}: ${data['price']:,.4f} (Updated: {data['timestamp']})")
            else:
                print(f"{data['symbol']}: No data received yet")
        
        print(f"\nQueue size: {len(self.price_queue)}/{self.price_queue.maxlen}")
        print("=" * 50)


def run_market_data_stream():
    """
    Main function to run the market data stream
    Modify the symbols list below to track different cryptocurrencies
    """
    # Define the symbols you want to track
    # Popular symbols: BTCUSDT, ETHUSDT, ADAUSDT, DOTUSDT, LINKUSDT, etc.
    symbols_to_track = ['BTCUSDT', 'ETHUSDT', 'ADAUSDT', 'BNBUSDT']
    
    # Create market data subscriber with limited queue size (prevents memory overflow)
    market_data = BinanceMarketData(symbols_to_track, max_queue_size=20)
    
    try:
        print("Starting Binance Market Data Subscriber")
        print(f"Tracking symbols: {symbols_to_track}")
        print("Press Ctrl+C to stop\n")
        
        # Run the async streaming function
        asyncio.run(market_data.start_streaming())
        
    except KeyboardInterrupt:
        print("\n🛑 Stopping market data stream...")
        market_data.stop_streaming()
        market_data.print_summary()
    
    except Exception as e:
        print(f"❌ Unexpected error: {e}")
        market_data.stop_streaming()


def start_background_stream():
    """
    Start market data stream in a background thread
    Useful when running in Spyder to keep the console responsive
    """
    def background_task():
        symbols = ['BTCUSDT', 'ETHUSDT', 'ADAUSDT']
        market_data = BinanceMarketData(symbols, max_queue_size=15)
        
        try:
            asyncio.run(market_data.start_streaming())
        except Exception as e:
            print(f"Background stream error: {e}")
    
    # Start in background thread
    thread = threading.Thread(target=background_task, daemon=True)
    thread.start()
    print("Market data stream started in background thread")
    return thread

# Example usage functions
def get_single_price_update():
    """
    Get a single price update for testing
    """
    async def single_update():
        market_data = BinanceMarketData(['BTCUSDT'], max_queue_size=5)
        if await market_data.connect_websocket():
            print("Getting single price update...")
            
            # Listen for one message
            message = await market_data.websocket.recv()
            data = json.loads(message)
            market_data.process_ticker_data(data)
            
            await market_data.websocket.close()
            return market_data.get_latest_prices()
    
    return asyncio.run(single_update())

if __name__ == "__main__":
    # Run the main market data stream
    # run_market_data_stream()
    
    # Alternative: Run in background (uncomment to use)
    start_background_stream()
    # 
    # # Keep main thread alive
    # try:
    #     while True:
    #         time.sleep(1)
    # except KeyboardInterrupt:
    #     print("Main thread stopped")

Market data stream started in background thread
Connecting to: wss://stream.binance.com:9443/ws/btcusdt@ticker/ethusdt@ticker/adausdt@ticker
Subscribing to symbols: ['BTCUSDT', 'ETHUSDT', 'ADAUSDT']


In [14]:
pric

NameError: name 'data' is not defined