In [3]:
import json
import websocket
import pandas as pd
import os
import threading
import time

# Global flag to stop the WebSocket thread
stop_thread = False
max_rows = 500  # Max number of rows you want to collect

# Define the path and name of the CSV file
csv_file = 'cryptocurrency_data_updated.csv'

# Initialize a counter for rows
row_counter = 0


In [5]:
# Define the function to handle incoming WebSocket messages
def on_message(ws, message):
    global row_counter
    message = json.loads(message)
    
    # Key mapping for the data
    key_mapping = {
        "t": "Start Time",
        "T": "End Time",
        "s": "Symbol",
        "i": "Interval",
        "f": "First Trade ID",
        "L": "Last Trade ID",
        "o": "Open Price",
        "c": "Close Price",
        "h": "High Price",
        "l": "Low Price",
        "v": "Base Asset Volume",
        "n": "Number of Trades",
        "x": "Is this the Final Kline?",
        "q": "Quote Asset Volume",
        "V": "Taker Buy Base Asset Volume",
        "Q": "Taker Buy Quote Asset Volume",
        "B": "Ignore"
    }

    # Process the message data
    kline_data = message['data']['k']
    human_readable_kline = {key_mapping.get(key, key): value for key, value in kline_data.items()}
    
    # Update the source
    source = {"stream": message['stream'], "kline_data": human_readable_kline}
    
    # Manipulate the data and save to CSV
    df = manipulation(source)
    
    # Append to CSV file (without header on subsequent rows)
    df.to_csv(csv_file, mode='a', header=not os.path.exists(csv_file), index=False)

    # Update row counter
    row_counter += 1

    # Check if the row limit is reached
    if row_counter >= max_rows:
        print("Row limit reached. Stopping WebSocket...")
        stop_websocket()

    print(f"Data saved for {source['kline_data']['Symbol']} at {df['Timestamp'][0]}")


In [7]:
# Define the function for data manipulation
def manipulation(source):
    # Extract the relevant data with the full column names
    relevant_data = {
        "Symbol": source["kline_data"]["Symbol"],
        "Open Price": source["kline_data"]["Open Price"],
        "Close Price": source["kline_data"]["Close Price"],
        "High Price": source["kline_data"]["High Price"],
        "Low Price": source["kline_data"]["Low Price"],
        "Volume": source["kline_data"]["Base Asset Volume"],
        "Number of Trades": source["kline_data"]["Number of Trades"],
        "Quote Asset Volume": source["kline_data"]["Quote Asset Volume"]
    }
    
    # Event Time using 'End Time' (T)
    event_time = pd.to_datetime(source["kline_data"]["End Time"], unit='ms')  
    
    # Create a DataFrame with the new column names
    df = pd.DataFrame(
        [relevant_data], 
        columns=["Symbol", "Open Price", "Close Price", "High Price", "Low Price", "Volume", "Number of Trades", "Quote Asset Volume"],
        index=[event_time]
    )
    
    # Update the index name and reset the DataFrame
    df.index.name = 'Timestamp'
    
    # Apply astype(float) only to numeric columns
    numeric_columns = ["Close Price", "Open Price", "High Price", "Low Price", "Volume", "Number of Trades", "Quote Asset Volume"]
    df[numeric_columns] = df[numeric_columns].astype(float)
    
    # Reset the DataFrame index for clarity
    df = df.reset_index()

    return df


In [9]:
# Define the function to start WebSocket
def start_websocket():
    global stop_thread
    assets = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT']
    assets = [coin.lower() + '@kline_1m' for coin in assets]
    assets = '/'.join(assets)
    socket = "wss://stream.binance.com:9443/stream?streams="+assets
    ws = websocket.WebSocketApp(socket, on_message=on_message)

    # Loop to maintain the WebSocket connection
    while not stop_thread:
        try:
            ws.run_forever()
        except Exception as e:
            print(f"Error: {e}")
            time.sleep(1)  # Retry after an error
            if stop_thread:
                break

    # Properly close the WebSocket when the loop stops
    ws.close()

# Define the function to stop WebSocket
def stop_websocket():
    global stop_thread
    stop_thread = True
    print("WebSocket stopped.")


In [11]:
# Run WebSocket in a separate thread
thread = threading.Thread(target=start_websocket)
thread.start()


In [13]:
# Manually stop the WebSocket (if needed)
stop_thread = True
