In [1]:
import pandas as pd
from binance.client import Client
import websocket
import json
import threading
import os
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
class BinanceWebSocketConnection:
    def __init__(self, url, timer = 1440, max_retries = 10):
        self.url = url
        self.timer = timer
        self.websocket_data = []
        self.retries = 0
        self.max_retries = max_retries
        
    def on_message(self, ws, message):
        try:
            data = json.loads(message)
            if "k" in data:
                wdata = data['k']
                timestamp = pd.to_datetime(wdata['t'], unit='ms')
                price = float(wdata['c'])
                self.websocket_data.append((timestamp, price))
        except Exception as e:
            print(f"Error in on_message: {e}")    
    def on_open(self, ws):
        try:
            payload = {"method": "SUBSCRIBE",
                    "params": ["btcusdt@kline_1s"],
                       "id": 1}
            ws.send(json.dumps(payload))
            time = threading.Timer(self.timer, ws.close)
            time.start()
        except Exception as e:
            print(f"Error in on_open: {e}")

    def on_error(self, ws, error):
        print("Error:", error)
        self.reconnect_on_error()
        
    def on_close(self, ws, close_status, close_message):
        print("WebSocket closed")
        
    def reconnect_on_error(self):
        while self.retries < self.max_retries:
            try:
                reconnection_time = 2 ** self.retries
                time.sleep(reconnection_time)
                self.get_websocket_data()
                self.retries += 1
            except Exception as e:
                print(f"Unsuccessful reconnection")
                
    def get_websocket_data(self):
        ws = websocket.WebSocketApp(self.url,
                                    on_open=self.on_open,
                                    on_message=self.on_message,
                                    on_error=self.on_error,
                                    on_close=self.on_close )
        ws.run_forever()

    def thread(self):
        thread = threading.Thread(target=self.get_websocket_data, daemon = True)
        thread.start()
        return thread

In [3]:
class BinanceAPIConnection:
    def __init__(self, api_key, api_secret):
        self.api_key = api_key
        self.api_secret = api_secret
        self.client = Client(self.api_key, self.api_secret)

    def load_klines(self, symbol, interval, lookback):
        try:
            klines = self.client.get_historical_klines(symbol, interval, lookback)
            return klines
        except Exception as e:
            raise Exception(f"API call failure :{e}")

In [4]:
class DataTransformation:
    def transform_data(self, klines):
        if not klines:
            raise Exception(f"Kline data is missing") 
        try:
            selected_data = [(i[0], float(i[4])) for i in klines]
            raw_df = pd.DataFrame(selected_data, columns=["timestamp", "price"])
            raw_df["timestamp"] = pd.to_datetime(raw_df["timestamp"], unit = "ms")
            return raw_df
        except Exception as e:
            raise Exception(f"Data transformation failure: {e}") 
            
    def data_metrics_analysis(self,df):
        if df.empty:
            raise Exception(f"Dataframe is empty") 
        try:
            df["timestamp"] = pd.to_datetime(df["timestamp"], unit = "ms")
            df['time_m'] = df['timestamp'].dt.floor('min')
            df = df[df["time_m"] > min(df["time_m"])]
            
            dp_df = df["time_m"].value_counts().sort_index().reset_index()
            dp_df.columns = ["time_m", "Volume"]
            dp_df = dp_df.head(10)
    
            dp_df["Highest"] = dp_df["Volume"].cummax()
            dp_df["Lowest"] = dp_df["Volume"].cummin()
            dp_df["Mean"] = dp_df["Volume"].expanding().mean()

            moving_avg = df.groupby("time_m")["price"].mean().reset_index()
            moving_avg["Moving_avg_10"] = moving_avg["price"].rolling(window=10).mean()
            moving_avg["Moving_avg_20"] = moving_avg["price"].rolling(window=20).mean()
            return dp_df, moving_avg
        except Exception as e:
            print(f"Data transformation error: {e}")

In [5]:
ws = BinanceWebSocketConnection("wss://stream.binance.com:9443/ws", timer = 1440, max_retries = 10)
ws_thread = ws.thread()
ws_thread.join()

api_key = os.getenv("BINANCE_API_KEY")
api_secret = os.getenv("BINANCE_API_SECRET")

if not api_key or not api_secret:
    raise Exception("Missing API credentials.")

client = BinanceAPIConnection(api_key, api_secret)
symbol = "BTCUSDT"
interval = "1s"
lookback = "1700 sec ago UTC"
data = client.load_klines(symbol, interval, lookback)

raw_df = DataTransformation().transform_data(data)
metrics_df_api, moving_avg_df_api = DataTransformation().data_metrics_analysis(raw_df)

main_df = pd.DataFrame(ws.websocket_data, columns=["timestamp", "price"])
metrics_df_ws, moving_avg_df_ws = DataTransformation().data_metrics_analysis(main_df)


metrics_df_ws

WebSocket closed


Unnamed: 0,time_m,Volume,Highest,Lowest,Mean
0,2025-07-02 09:30:00,60,60,60,60.0
1,2025-07-02 09:31:00,60,60,60,60.0
2,2025-07-02 09:32:00,60,60,60,60.0
3,2025-07-02 09:33:00,60,60,60,60.0
4,2025-07-02 09:34:00,60,60,60,60.0
5,2025-07-02 09:35:00,60,60,60,60.0
6,2025-07-02 09:36:00,60,60,60,60.0
7,2025-07-02 09:37:00,60,60,60,60.0
8,2025-07-02 09:38:00,60,60,60,60.0
9,2025-07-02 09:39:00,60,60,60,60.0
