In [1]:
pip install kafka-python websocket-client


Note: you may need to restart the kernel to use updated packages.


In [None]:
import json, time, random
import websocket
from kafka import KafkaProducer
from datetime import datetime

# Kafka setup
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    api_version=(0, 10),
    acks=1,
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

socket_url = "wss://stream.binance.us:9443/ws/btcusdt@trade"

def on_message(ws, message):
    data = json.loads(message)
    transaction = {
        "txn_id": data["t"],
        "user_id": random.randint(1000, 5000),
        "amount": float(data["p"]) * float(data["q"]),
        "location": random.choice(["US", "UK", "IN", "DE", "SG", "CA"]),
        "merchant_id": hash(data["s"]) % 10000,
        "timestamp": datetime.utcnow().isoformat()
    }
    
    producer.send('transactions', value=transaction)
    producer.flush()  # Ensure data is sent
    print("Produced:", transaction)

def on_error(ws, error):
    print("Error:", error)

# FIXED: Added arguments to match modern websocket-client API
def on_close(ws, close_status_code=None, close_msg=None):
    print(f"WebSocket closed (Code: {close_status_code}). Reconnecting...")
    time.sleep(5)
    start_stream()

def on_open(ws):
    print("âœ… Connected to Binance stream...")

def start_stream():
    ws = websocket.WebSocketApp(
        socket_url,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close
    )
    ws.on_open = on_open
    ws.run_forever()

if __name__ == "__main__":
    start_stream()