# Binance Aggregation Trade Stream Websocket (Futures) V2.0

### This Script Gets the Aggregation data from Binance & Saves it as a CSV file every 30 Minutes

In [None]:
import websocket
import pandas as pd
import json 
import threading  
import time  

# Create an empty DataFrame with specified columns using pandas
df_agg = pd.DataFrame(columns=["Symbol", "Price", "Quantity", "Is Buyer Market Maker"])  
# Initialize ws variable as None
ws = None  
# Set save_interval variable to 1800 (seconds) for saving data at regular intervals (1 hour)
save_interval = 1800  


# Define a function named save_and_clear_df to Save the Data Recivied as a CSV
def save_and_clear_df():  
    while True:
        time.sleep(save_interval)
        current_time = pd.Timestamp.now().strftime("%Y-%m-%d_%H-%M-%S") 
        df_agg.to_csv(f"Aggregate-{current_time}.csv")  
        print(f"(Agg) Chapter Saved at {current_time}") 
        df_agg.drop(df_agg.index, inplace=True) 
        print(f"(Agg) Clearing Dataframe at {current_time}") 

# Create a thread to save and clear DataFrame
save_thread = threading.Thread(target=save_and_clear_df) 
save_thread.daemon = True  # Set the thread as daemon so it terminates when the main thread exits
save_thread.start()  


# Define a function named on_message with parameters ws and message
def on_message(ws, message):  
    msg = json.loads(message)  # Parse the incoming message as JSON
    if msg["e"] == "aggTrade":  # Check if the message event is an aggregate trade
        stream_agg_trade(msg)  # Call stream_agg_trade function with the message

# Define a function named on_error with parameters ws and error
def on_error(ws, error):  
    print(error)  
    current_time = pd.Timestamp.now().strftime("%Y-%m-%d_%H-%M-%S")  
    df_agg.to_csv(f"Aggregate-{current_time}.csv") 

# Define a function named on_close with parameter ws
def on_close(ws):  
    print("### Closed ###")
    current_time = pd.Timestamp.now().strftime("%Y-%m-%d_%H-%M-%S")
    df_agg.to_csv(f"Aggregate-{current_time}.csv") 

def on_open(ws):  # Define a function named on_open with parameter ws
    print("### Opened ###")  # Print a message indicating WebSocket is opened

# Define a function named stream_agg_trade with parameter msg to extract the data into a dataframe 
def stream_agg_trade(msg):  
    event_time = pd.to_datetime(msg["E"], unit='ms')  # Convert event time to a pandas Timestamp
    symbol = msg["s"]  # Extract symbol from message
    price = float(msg["p"])  # Extract price from message and convert to float
    quantity = float(msg["q"])  # Extract quantity from message and convert to float
    is_buyer_market_maker = msg["m"]  # Extract if buyer is a market maker from message
    print(f"AGGREGATION: Event Time: {event_time} | Symbol: {symbol} | Price: {price} | Quantity: {quantity} | Is Buyer Market Maker: {is_buyer_market_maker}", end="\r")  # Print aggregated trade information
    df_agg.loc[event_time] = [symbol, price, quantity, is_buyer_market_maker]  # Add aggregated trade information to DataFrame

# Define the WebSocket URL for Aggregate Trade Streams
socket_url = "wss://fstream.binance.com/ws/btcusdt@aggTrade"

# Create and run the WebSocket connection in a separate thread
def run_websocket(): 
    websocket.enableTrace(False)  # Disable websocket trace
    global ws 
    ws = websocket.WebSocketApp(socket_url, on_message=on_message, on_error=on_error, on_close=on_close)  # Create WebSocketApp instance with specified URL and event handlers
    ws.on_open = on_open  # Set on_open handler
    ws.run_forever()  # Start the WebSocket connection and run indefinitely

def stop_websocket():  # Define a function named stop_websocket
    global ws  # Access the global ws variable
    if ws:  # Check if WebSocket connection exists
        ws.close()  # Close the WebSocket connection if it's open

websocket_thread = threading.Thread(target=run_websocket)  # Create a thread targeting the run_websocket function
websocket_thread.start()  # Start the thread
