In [1]:
import json
import websocket
import pandas as pd
from kafka import KafkaProducer
import threading

# Kafka setup
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

assets = 'btcusdt@kline_1m'

def manipulation(source):
    try:
        # Extracting all the relevant data from the source
        event_time = pd.to_datetime(source['data']['E'], unit='ms')
        kline = source['data']['k']
        data_dict = {
            'event_time': event_time.isoformat(),
            'symbol': source['data']['s'],
            'kline_start_time': pd.to_datetime(kline['t'], unit='ms').isoformat(),
            'kline_close_time': pd.to_datetime(kline['T'], unit='ms').isoformat(),
            'interval': kline['i'],
            'first_trade_id': kline['f'],
            'last_trade_id': kline['L'],
            'open_price': float(kline['o']),
            'close_price': float(kline['c']),
            'high_price': float(kline['h']),
            'low_price': float(kline['l']),
            'base_asset_volume': float(kline['v']),
            'number_of_trades': kline['n'],
            'is_kline_closed': kline['x'],
            'quote_asset_volume': float(kline['q']),
            'taker_buy_base_asset_volume': float(kline['V']),
            'taker_buy_quote_asset_volume': float(kline['Q']),
            'ignore': kline['B']
        }
        print(data_dict)  # Print the data dictionary for debugging
        producer.send('bitcoin-data', value=data_dict)
    except KeyError as e:
        print(f"KeyError: {e}")
        print(f"Source: {source}")

def on_message(ws, message):
    message = json.loads(message)
    manipulation(message)

socket = "wss://stream.binance.com:9443/stream?streams=" + assets
ws = websocket.WebSocketApp(socket, on_message=on_message)

def run_ws():
    ws.run_forever()

ws_thread = threading.Thread(target=run_ws)
ws_thread.start()


{'event_time': '2024-05-26T17:52:20.453000', 'symbol': 'BTCUSDT', 'kline_start_time': '2024-05-26T17:52:00', 'kline_close_time': '2024-05-26T17:52:59.999000', 'interval': '1m', 'first_trade_id': 3614460694, 'last_trade_id': 3614460818, 'open_price': 68825.17, 'close_price': 68819.37, 'high_price': 68825.18, 'low_price': 68819.37, 'base_asset_volume': 1.01065, 'number_of_trades': 125, 'is_kline_closed': False, 'quote_asset_volume': 69553.4296043, 'taker_buy_base_asset_volume': 0.07562, 'taker_buy_quote_asset_volume': 5204.3342055, 'ignore': '0'}
{'event_time': '2024-05-26T17:52:22.833000', 'symbol': 'BTCUSDT', 'kline_start_time': '2024-05-26T17:52:00', 'kline_close_time': '2024-05-26T17:52:59.999000', 'interval': '1m', 'first_trade_id': 3614460694, 'last_trade_id': 3614460820, 'open_price': 68825.17, 'close_price': 68819.37, 'high_price': 68825.18, 'low_price': 68819.37, 'base_asset_volume': 1.01619, 'number_of_trades': 127, 'is_kline_closed': False, 'quote_asset_volume': 69934.6889156,

In [2]:
from kafka import KafkaConsumer
import psycopg2
import json
from datetime import datetime

# Kafka consumer configuration
consumer = KafkaConsumer(
    'bitcoin-data',
    bootstrap_servers='localhost:9092',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# PostgreSQL connection configuration
conn = psycopg2.connect(
    "dbname='Bitcoin' user='postgres' host='localhost' password='root'"
)
cur = conn.cursor()

# Create table if it doesn't exist
cur.execute("""
CREATE TABLE IF NOT EXISTS bitcoin_data (
    event_time TIMESTAMP,
    symbol TEXT,
    kline_start_time TIMESTAMP,
    kline_close_time TIMESTAMP,
    interval TEXT,
    first_trade_id BIGINT,
    last_trade_id BIGINT,
    open_price FLOAT,
    close_price FLOAT,
    high_price FLOAT,
    low_price FLOAT,
    base_asset_volume FLOAT,
    number_of_trades INT,
    is_kline_closed BOOLEAN,
    quote_asset_volume FLOAT,
    taker_buy_base_asset_volume FLOAT,
    taker_buy_quote_asset_volume FLOAT,
    ignore TEXT
)
""")
conn.commit()

# Function to convert ISO formatted string to a PostgreSQL-compatible timestamp
def convert_timestamp(ts):
    return datetime.fromisoformat(ts)

# Consume messages and load into PostgreSQL
for message in consumer:
    data = message.value
    cur.execute("""
        INSERT INTO bitcoin_data (
            event_time, symbol, kline_start_time, kline_close_time, interval,
            first_trade_id, last_trade_id, open_price, close_price, high_price,
            low_price, base_asset_volume, number_of_trades, is_kline_closed,
            quote_asset_volume, taker_buy_base_asset_volume, taker_buy_quote_asset_volume, ignore
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        convert_timestamp(data['event_time']),
        data['symbol'],
        convert_timestamp(data['kline_start_time']),
        convert_timestamp(data['kline_close_time']),
        data['interval'],
        data['first_trade_id'],
        data['last_trade_id'],
        data['open_price'],
        data['close_price'],
        data['high_price'],
        data['low_price'],
        data['base_asset_volume'],
        data['number_of_trades'],
        data['is_kline_closed'],
        data['quote_asset_volume'],
        data['taker_buy_base_asset_volume'],
        data['taker_buy_quote_asset_volume'],
        data['ignore']
    ))
    conn.commit()

# Clean up
cur.close()
conn.close()


KeyboardInterrupt: 