#### Binance Project

This project is dedicated to collecting accurate price and timestamp data from Binance using both the REST API and Websocket. the goal is to build a reliable data pipeline that extracts raw market data, transforms it into a clean and usable format, loads it into the appropriate storage layers, and ultimately analyzes it to uncover insights.

To maintain clarity and structure throughout the development process, the documentation and codebase will be organized into four main sections: data extraction, data transformation, data loading, and data analysis. Each section will detail the methods, tools, and design choices involved, creating a clear end-to-end overview of the entire workflow.

In [1]:
import websocket
import json
import pandas as pd
from binance.client import Client
import time

In [2]:
ws_data = []
start_time = None
duration_time = None
ws = None

symbol = "btcusdc"
interval = "1s"
socket_info = f"wss://stream.binance.com:9443/ws/{symbol}@kline_{interval}"

api_key = "SSPb2vTZrlFlSvq08yoVOVcEQeGI7MEryavlnoLikFNqDoEpwcBOcD2GhNlEilGi"
secret_api_key = "GEjFIi5B2a50aikp4MlAJZ7yue0lBsZJ9pTxcprtwMbnKH2TZilkDz9h3YUjhSTo"
client = Client(api_key,secret_api_key)

btc = "BTCUSDC"
time_frame = "1s"
lookback_period = "1 minute ago"


In [3]:

def on_message(ws,msg):

    msg = json.loads(msg)
    candlestick = msg["k"]

    if candlestick["x"]:
        ws_data.append({
            "timestamp":candlestick["t"],
            "price_ws":float(candlestick["c"])})
        
    print("Data stream is intact and running, wait for the desired duration to end")
    
    if time.time() - start_time >= duration_time:
        print(f"Reached {duration_time} seconds. Closing WebSocket...")
        ws.close()

def on_error(ws, error):
    print("ERROR:", error)

def on_close(ws, code, msg):
    print("CONNECTION IS CLOSED")

def start_ws(seconds:int):

    global start_time,duration_time,ws

    start_time = time.time()
    duration_time = seconds

    ws = websocket.WebSocketApp(
        socket_info,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close
    )
    ws.run_forever()

start_ws(60)

df_ws = pd.DataFrame(ws_data)
df_ws["timestamp"] = pd.to_datetime(df_ws["timestamp"], unit="ms")

def get_btc_data(btc,time_frame,lookback_period):

    btc_info = client.get_historical_klines(btc,time_frame,lookback_period)

    df_rest = pd.DataFrame(btc_info, columns = ["timestamp", "open","high","low","close","volume","close_time",
                                               "quote_asset_volume","number_of trades","taker_buy_base_asset_volume",
                                                "taker_buy_quote_asset_volume","ignore"])
    
    df_rest["timestamp"]= pd.to_datetime(df_rest["close_time"],unit="ms").dt.floor("s")
    df_rest["price_rest"] = (pd.to_numeric(df_rest["close"]).round(2))

    return df_rest[["timestamp","price_rest"]]

df_rest = pd.DataFrame(get_btc_data(btc,time_frame,lookback_period))

df_final = pd.merge(df_rest, df_ws, on="timestamp", how="inner")


Data stream is intact and running, wait for the desired duration to end
Data stream is intact and running, wait for the desired duration to end
Data stream is intact and running, wait for the desired duration to end
Data stream is intact and running, wait for the desired duration to end
Data stream is intact and running, wait for the desired duration to end
Data stream is intact and running, wait for the desired duration to end
Data stream is intact and running, wait for the desired duration to end
Data stream is intact and running, wait for the desired duration to end
Data stream is intact and running, wait for the desired duration to end
Data stream is intact and running, wait for the desired duration to end
Data stream is intact and running, wait for the desired duration to end
Data stream is intact and running, wait for the desired duration to end
Data stream is intact and running, wait for the desired duration to end
Data stream is intact and running, wait for the desired duration

In [4]:
max_price_rest = 0

for price in df_rest["price_rest"]:
    if price > max_price_rest:
        max_price_rest = price

max_price_ws = 0
for price in df_ws['price_ws']:

    if price > max_price_ws:
        max_price_ws = price

def high(max_price_rest,max_price_ws):

    if max_price_rest == max_price_ws:
        return f"The Highest price from both is equal and is {max_price_rest}"
    else:
        if max_price_rest > max_price_ws:
            return f"Price from REST is bigger and its {max_price_rest}"
        else:
            return f"Price from WS is bigger and its {max_price_ws}"

min_price_rest = float("inf")

for price in df_rest["price_rest"]:

    if price < min_price_rest:
        min_price_rest = min(price,min_price_rest)

min_price_ws = float("inf")

for price in df_ws["price_ws"]:

    if price < min_price_ws:
        min_price_ws = min(price,min_price_ws)

def low(min_price_ws,min_price_rest):

    if min_price_ws == min_price_ws:
        return f"The Lowest price from both is equal and is {min_price_ws}"
    else:
        if min_price_ws > min_price_rest:
            return f" Price from REST is lower and is {min_price_rest}"
        else:
            return f" Price from WS is lower and is {min_price_ws}"

def volume(arr):

    return f"The volume for WS per minute is {len(arr)}"
            
mean =  f"The overall mean is {float(df_final[["price_rest", "price_ws"]].stack().mean().round(2))}"

In [5]:
high(max_price_rest,max_price_ws)

'The Highest price from both is equal and is 86477.93'

In [6]:
low(min_price_rest,min_price_ws)

'The Lowest price from both is equal and is 86430.37'

In [7]:
volume(df_ws["price_ws"])

'The volume for WS per minute is 60'

In [8]:
mean

'The overall mean is 86451.25'

In [9]:
# MA made for both rest and ws incase of price diff


In [10]:
df_final["10MA_rest"] = df_final["price_rest"].rolling(10).mean().round(2)
df_final["20MA_rest"] = df_final["price_rest"].rolling(20).mean().round(2)
df_final["10MA_ws"] = df_final["price_ws"].rolling(10).mean().round(2)
df_final["20MA_ws"] = df_final["price_ws"].rolling(20).mean().round(2)


In [11]:
df_final


Unnamed: 0,timestamp,price_rest,price_ws,10MA_rest,20MA_rest,10MA_ws,20MA_ws
0,2025-11-24 13:32:44,86459.94,86459.94,,,,
1,2025-11-24 13:32:45,86459.94,86459.94,,,,
2,2025-11-24 13:32:46,86459.94,86459.94,,,,
3,2025-11-24 13:32:47,86465.08,86465.08,,,,
4,2025-11-24 13:32:48,86465.08,86465.08,,,,
5,2025-11-24 13:32:49,86475.36,86475.36,,,,
6,2025-11-24 13:32:50,86477.93,86477.93,,,,
7,2025-11-24 13:32:51,86477.93,86477.93,,,,
8,2025-11-24 13:32:52,86473.01,86473.01,,,,
9,2025-11-24 13:32:53,86471.01,86471.01,86468.52,,86468.52,


In [13]:
!pip install mysql-connector-python
!pip install SQLAlchemy

Collecting mysql-connector-python
  Downloading mysql_connector_python-9.5.0-cp313-cp313-win_amd64.whl.metadata (7.7 kB)
Downloading mysql_connector_python-9.5.0-cp313-cp313-win_amd64.whl (16.5 MB)
   ---------------------------------------- 0.0/16.5 MB ? eta -:--:--
   ---- ----------------------------------- 1.8/16.5 MB 8.8 MB/s eta 0:00:02
   -------- ------------------------------- 3.4/16.5 MB 8.0 MB/s eta 0:00:02
   --------------- ------------------------ 6.3/16.5 MB 10.1 MB/s eta 0:00:02
   -------------------- ------------------- 8.4/16.5 MB 10.8 MB/s eta 0:00:01
   ----------------------------- ---------- 12.1/16.5 MB 11.6 MB/s eta 0:00:01
   ------------------------------------- -- 15.5/16.5 MB 12.5 MB/s eta 0:00:01
   ---------------------------------------- 16.5/16.5 MB 11.8 MB/s eta 0:00:00
Installing collected packages: mysql-connector-python
Successfully installed mysql-connector-python-9.5.0


In [2]:
from flask import Flask, jsonify,request
from threading import Thread
import mysql.connector
from sqlalchemy import create_engine

In [19]:
eng = create_engine("mysql+mysqlconnector://root:Mysql999!@localhost/binance_project")

In [21]:
df_final.to_sql("btc_data",eng,if_exists="append",index=False)

56

In [4]:
app = Flask(__name__)

def get_db_connection():
    connection = mysql.connector.connect(
        host="localhost",
        user="root",
        password="Mysql999!",
        database="binance_project"
    )
    return connection

@app.route("/btc", methods=["GET"])
def get_btc_data():
    data_type = request.args.get("type")
    start = request.args.get("start")
    end = request.args.get("end")
    limit = request.args.get("limit")

    conn = get_db_connection()
    cursor = conn.cursor(dictionary=True)

    
    if data_type == "rest":
        select_cols = "price_rest, timestamp"
    elif data_type == "ws":
        select_cols = "price_ws, timestamp"
    else:
        select_cols = "price_rest, price_ws, timestamp"

    query = f"SELECT {select_cols} FROM btc_data"
    conditions = []

    if start:
        conditions.append(f"timestamp >= '{start}'")
    if end:
        conditions.append(f"timestamp <= '{end}'")

    if conditions:
        query += " WHERE " + " AND ".join(conditions)

    query += " ORDER BY timestamp ASC"

    if limit:
        query += f" LIMIT {limit}"

    cursor.execute(query)
    rows = cursor.fetchall()

    cursor.close()
    conn.close()

    return jsonify(rows)

In [5]:
def run_flask():
    app.run(host="127.0.0.1", port=5000, debug=False, use_reloader=False)


In [6]:
Thread(target=run_flask).start()

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
Press CTRL+C to quit
