In [1]:
pip install websocket-client pandas kafka-python

Note: you may need to restart the kernel to use updated packages.


In [2]:
import json
import websocket
import pandas as pd
from kafka import KafkaProducer
import threading

# Kafka setup
producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))

assets = 'btcusdt@kline_1m'

def manipulation(source):
    rel_data = source['data']['k']['c']
    evt_time = pd.to_datetime(source['data']['E'], unit='ms')
    df = pd.DataFrame([rel_data], columns=[source['data']['s']], index=[evt_time])
    df.index.name = 'timestamp'
    df = df.astype(float)
    df = df.reset_index()
    data_dict = df.to_dict(orient='records')[0]
    print(data_dict)  # Print the data dictionary
    producer.send('bitcoin-data', value=data_dict)

def on_message(ws, message):
    message = json.loads(message)
    manipulation(message)

socket = "wss://stream.binance.com:9443/stream?streams=" + assets
ws = websocket.WebSocketApp(socket, on_message=on_message)

def run_ws():
    ws.run_forever()

ws_thread = threading.Thread(target=run_ws)
ws_thread.start()


{'timestamp': Timestamp('2024-05-22 22:37:38.092000'), 'BTCUSDT': 69119.2}
{'timestamp': Timestamp('2024-05-22 22:37:40.243000'), 'BTCUSDT': 69119.2}
{'timestamp': Timestamp('2024-05-22 22:37:42.315000'), 'BTCUSDT': 69116.11}
{'timestamp': Timestamp('2024-05-22 22:37:44.497000'), 'BTCUSDT': 69116.11}
{'timestamp': Timestamp('2024-05-22 22:37:46.605000'), 'BTCUSDT': 69123.04}
{'timestamp': Timestamp('2024-05-22 22:37:48.965000'), 'BTCUSDT': 69130.84}
{'timestamp': Timestamp('2024-05-22 22:37:51.056000'), 'BTCUSDT': 69132.2}
{'timestamp': Timestamp('2024-05-22 22:37:53.104000'), 'BTCUSDT': 69142.99}
{'timestamp': Timestamp('2024-05-22 22:37:55.665000'), 'BTCUSDT': 69148.44}
{'timestamp': Timestamp('2024-05-22 22:37:58.088000'), 'BTCUSDT': 69148.44}
{'timestamp': Timestamp('2024-05-22 22:38:00.005000'), 'BTCUSDT': 69161.83}
{'timestamp': Timestamp('2024-05-22 22:38:02.011000'), 'BTCUSDT': 69120.21}
{'timestamp': Timestamp('2024-05-22 22:38:04.020000'), 'BTCUSDT': 69116.37}
{'timestamp': T

In [3]:
from kafka import KafkaConsumer
import json
import psycopg2

# Kafka consumer setup
consumer = KafkaConsumer(
    'bitcoin-data', 
    bootstrap_servers='localhost:9092',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

# PostgreSQL database connection setup
conn = psycopg2.connect(
    dbname="Bitcoin",
    user="postgres",
    password="root",
    host="localhost"
)
cur = conn.cursor()

# Create a table if not exists
cur.execute("""
    CREATE TABLE IF NOT EXISTS bitcoin_data (
        timestamp TIMESTAMP,
        BTCUSDT FLOAT
    );
""")
conn.commit()

# Consume messages from Kafka and store them in PostgreSQL
record_limit = 10  # Set the limit of records to consume
records_consumed = 0  # Counter to track the number of records consumed

try:
    for message in consumer:
        data = message.value
        timestamp = data['timestamp']
        btcusdt = data['BTCUSDT']
        
        # Insert data into PostgreSQL table
        cur.execute("""
            INSERT INTO bitcoin_data (timestamp, BTCUSDT) VALUES (%s, %s);
        """, (timestamp, btcusdt))
        conn.commit()
        
        # Increment the counter
        records_consumed += 1
        
        # Check if the limit is reached
        if records_consumed >= record_limit:
            break
except KeyboardInterrupt:
    pass

# Close connections
cur.close()
conn.close()