In [1]:
pip install kafka-python websocket-client


Note: you may need to restart the kernel to use updated packages.


In [None]:
import json, time, random
import websocket
from kafka import KafkaProducer
from datetime import datetime

# Kafka setup
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],  # or EC2 IP
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Binance WebSocket stream for BTC/USDT trades
socket_url = "wss://stream.binance.us:9443/ws/btcusdt@trade"



def on_message(ws, message):
    data = json.loads(message)
    
    # Convert to your transaction schema
    transaction = {
        "txn_id": data["t"],
        "user_id": random.randint(1000, 5000),  # fake user id
        "amount": float(data["p"]) * float(data["q"]),
        "location": random.choice(["US", "UK", "IN", "DE", "SG", "CA"]),
        "merchant_id": hash(data["s"]) % 10000,
        "timestamp": datetime.utcfromtimestamp(data["T"]/1000).isoformat()
    }
    
    # Push to Kafka
    producer.send('transactions', value=transaction)
    print("Produced:", transaction)

def on_error(ws, error):
    print("Error:", error)

def on_close(ws):
    print("WebSocket closed, reconnecting in 5s...")
    time.sleep(5)
    start_stream()

def on_open(ws):
    print("✅ Connected to Binance stream...")

def start_stream():
    ws = websocket.WebSocketApp(
        socket_url,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close
    )
    ws.on_open = on_open
    ws.run_forever()

if __name__ == "__main__":
    start_stream()


✅ Connected to Binance stream...
Produced: {'txn_id': 30879727, 'user_id': 4971, 'amount': 1082.0, 'location': 'DE', 'merchant_id': 5810, 'timestamp': '2025-11-03T03:52:05.091000'}
Produced: {'txn_id': 30879728, 'user_id': 3940, 'amount': 31.325747800000002, 'location': 'DE', 'merchant_id': 5810, 'timestamp': '2025-11-03T03:52:10.981000'}
Produced: {'txn_id': 30879729, 'user_id': 2890, 'amount': 238.72380220000002, 'location': 'UK', 'merchant_id': 5810, 'timestamp': '2025-11-03T03:52:25.518000'}
Produced: {'txn_id': 30879730, 'user_id': 4073, 'amount': 51.8495136, 'location': 'IN', 'merchant_id': 5810, 'timestamp': '2025-11-03T03:52:25.551000'}
Produced: {'txn_id': 30879731, 'user_id': 2295, 'amount': 130.7039822, 'location': 'CA', 'merchant_id': 5810, 'timestamp': '2025-11-03T03:52:27.418000'}
Produced: {'txn_id': 30879732, 'user_id': 1293, 'amount': 879.2813348, 'location': 'UK', 'merchant_id': 5810, 'timestamp': '2025-11-03T03:52:30.597000'}
Produced: {'txn_id': 30879733, 'user_id':