In [None]:
from shared.logging_config import setup_logging
setup_logging()

In [None]:
from streaming import binance_websocket, config

client = binance_websocket.BinanceWebSocketClient(
    symbols=["BTCUSDT", "ETHUSDT"],
    streams=["trade", "kline_1m"],
    callback=lambda data: print(data),  # Example callback function
    config=config.StreamingConfig()
)


In [None]:
# Start the WebSocket client
await client.connect()

In [None]:
client._extract_symbol("stream=btcusdt@trade")  # Example usage of _extract_symbol method

In [8]:
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],  # Adjust to 'kafka:29092' if running inside Docker
    value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

# Send a test message
producer.send('crypto_raw_ethusdt_trade', value={'symbol': 'ETHUSDT', 'stream_type': 'trade', 'data': 'test'})
producer.flush()
producer.close()

2025-08-11 22:16:07,834 - kafka.conn - INFO - <BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to localhost:9092 [('127.0.0.1', 9092) IPv4]
2025-08-11 22:16:07,838 - kafka.conn - INFO - Probing node bootstrap-0 broker version
2025-08-11 22:16:07,840 - kafka.conn - INFO - <BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Connection complete.
2025-08-11 22:16:07,947 - kafka.conn - INFO - Broker version identified as 2.5.0
2025-08-11 22:16:07,950 - kafka.conn - INFO - Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
2025-08-11 22:16:07,963 - kafka.conn - INFO - <BrokerConnection node_id=1 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: connecting to localhost:9092 [('127.0.0.1', 9092) IPv4]
2025-08-11 22:16:07,965 - kafka.conn - INFO - <BrokerConnection node_id=1 host=localhost:9092 <connecting> [IPv4 ('127.0.0.1', 9092)]>: Con

In [None]:
from kafka import KafkaConsumer

KAFKA_SERVERS = ['localhost:9092']  # Change to your Kafka servers
TEST_TOPIC = 'crypto_raw_ethusdt_trade'
try:
    consumer = KafkaConsumer(
        TEST_TOPIC,
        bootstrap_servers=KAFKA_SERVERS,
        auto_offset_reset='earliest',
        value_deserializer=lambda m: json.loads(m.decode('utf-8')),
        consumer_timeout_ms=5000  # 5 second timeout
    )
    
    print(f"   Consuming from '{TEST_TOPIC}'...")
    message_count = 0
    
    for message in consumer:
        message_count += 1
        print(f"   ✓ Received message #{message_count}")
        print(f"     Key: {message.key}")
        print(f"     Value: {json.dumps(message.value, indent=2)}")
        print(f"     Partition: {message.partition}")
        print(f"     Offset: {message.offset}")
        
        # Just read a few messages for testing
        if message.key == b'ADAUSDT':
            break
    
    if message_count == 0:
        print(f"   ⚠ No messages found in topic '{TEST_TOPIC}'")
    else:
        print(f"   ✓ Successfully consumed {message_count} messages")
        
    consumer.close()
    
except Exception as e:
    print(f"   ✗ Consumer error: {e}")