# Real Time Data Stream

This project is to start a real time data feed and build infrastructure needed to maintain it.

The subject of our data stream will be financial data, specifically for Crypto which has as 24/7 market.

### Outline:
1) Database
2) API
3) Data Storage
4) Analysis
5) Visualization

## Initialization

In [16]:
import pandas as pd
import numpy as np
import sqlite3 #needed for database
#datastream imports
import websocket
import json
import threading
import time
import atexit
from datetime import datetime, timezone

## Database

SQLite database to store the price data.

In [17]:
conn = sqlite3.connect('db/pricing_data.db', check_same_thread=False) #establish connection to the database and enable multithreading
c = conn.cursor() #create a cursor object

#creating the pricing data table with predefined schema
c.execute('''
    CREATE TABLE IF NOT EXISTS price_ticks(
          timestamp TEXT,
          symbol TEXT,
          price REAL,
          volume REAL,
          recieved_at TEXT)
    ''')
c.execute("DELETE FROM price_ticks")

conn.commit() #save these changes

## API

Selecting a data provider that provides real time, free to publish data.

Of the options, I am deciding on Finnhub. Other contenders were Alpha Vantage.

In [None]:
API_KEY = 'd0amcgpr01qm3l9meas0d0amcgpr01qm3l9measg'
SYMBOL = 'BINANCE:ETHUSDT'

#temporary row max logic
row_counter = 0
MAX_ROWS = 10 #this doesnt seem to be wokring!!

#recieves trades from the websocket, reformats, and prints it
def on_message(ws, message):
    global row_counter
    data = json.loads(message) #converts the json string from the websocket into a python object
    if data.get('type') == 'trade': #checks to make sure that the data is trades
        rows = []
        received_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
        for t in data['data']: #loops through all trades recieved
            if row_counter >= MAX_ROWS:
                print(f"{MAX_ROWS} rows reached — closing websocket")
                ws.close()
                return
            trade_time = datetime.fromtimestamp(t['t'] / 1000, tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")  #reformats date
            #append trades to a row and print
            row = (trade_time, t['s'], t['p'], t['v'], received_at)
            rows.append(row)
            row_counter += 1
            print(f"current row count at {row_counter}")
            print(f"inserted{row}")

        c.executemany("INSERT INTO price_ticks (timestamp, symbol, price, volume, recieved_at) VALUES (?,?,?,?,?)", rows)
        conn.commit()

#error message
def on_error(ws, error):
    print("WebSocket error:", error)

#closing message
def on_close(ws):
    print("WebSocket closed")

#opening message
def on_open(ws):
    print("WebSocket connection opened")
    ws.send(json.dumps({
        "type": "subscribe",
        "symbol": SYMBOL
    }))

#closing DC connection message
def close_conn():
    print("Closing DB connection...")
    conn.close()
atexit.register(close_conn)

websocket.enableTrace(True) #denoises the data

#initializing the websocket
ws = websocket.WebSocketApp(f"wss://ws.finnhub.io?token={API_KEY}",
                            on_message=on_message,
                            on_error=on_error,
                            on_close=on_close)
ws.on_open = on_open #opens the websocket
ws.run_forever() #begins running the websocket

--- request header ---
GET /?token=d0amcgpr01qm3l9meas0d0amcgpr01qm3l9measg HTTP/1.1
Upgrade: websocket
Host: ws.finnhub.io
Origin: https://ws.finnhub.io
Sec-WebSocket-Key: wpd3NV70vJbANZkCkeAsVg==
Sec-WebSocket-Version: 13
Connection: Upgrade


-----------------------
--- response header ---
HTTP/1.1 101 Switching Protocols
Server: nginx/1.18.0
Date: Sat, 03 May 2025 20:01:36 GMT
Connection: upgrade
Upgrade: websocket
Sec-WebSocket-Accept: OExEp8OOAC8ui4pouFThYNSugaY=
-----------------------
Websocket connected
++Sent raw: b'\x81\xb2\x9d\xe2\x82\xcb\xe6\xc0\xf6\xb2\xed\x87\xa0\xf1\xbd\xc0\xf1\xbe\xff\x91\xe1\xb9\xf4\x80\xe7\xe9\xb1\xc2\xa0\xb8\xe4\x8f\xe0\xa4\xf1\xc0\xb8\xeb\xbf\xa0\xcb\x85\xdc\xac\xc1\x8e\xa7\xa7\xd6\x83\xc8\xb1\xc6\x9f\xbf\x9f'
++Sent decoded: fin=1 opcode=1 data=b'{"type": "subscribe", "symbol": "BINANCE:ETHUSDT"}'


WebSocket connection opened


++Rcv raw: b'\x81~\x00\xab{"data":[{"c":null,"p":1839.2,"s":"BINANCE:ETHUSDT","t":1746302496935,"v":1.6885},{"c":null,"p":1839.2,"s":"BINANCE:ETHUSDT","t":1746302496935,"v":0.3115}],"type":"trade"}'
++Rcv decoded: fin=1 opcode=1 data=b'{"data":[{"c":null,"p":1839.2,"s":"BINANCE:ETHUSDT","t":1746302496935,"v":1.6885},{"c":null,"p":1839.2,"s":"BINANCE:ETHUSDT","t":1746302496935,"v":0.3115}],"type":"trade"}'


current row count at 0
inserted('2025-05-03 20:01:36 UTC', 'BINANCE:ETHUSDT', 1839.2, 1.6885, '2025-05-03 20:01:38 UTC')
current row count at 1
inserted('2025-05-03 20:01:36 UTC', 'BINANCE:ETHUSDT', 1839.2, 0.3115, '2025-05-03 20:01:38 UTC')


++Rcv raw: b'\x81~\x01\xd3{"data":[{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302497764,"v":0.019},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302498110,"v":0.0049},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302498110,"v":0.0029},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302498110,"v":0.0032},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302498110,"v":0.0025},{"c":null,"p":1839.2,"s":"BINANCE:ETHUSDT","t":1746302498316,"v":0.0054}],"type":"trade"}'
++Rcv decoded: fin=1 opcode=1 data=b'{"data":[{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302497764,"v":0.019},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302498110,"v":0.0049},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302498110,"v":0.0029},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302498110,"v":0.0032},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302498110,"v":0.0025},{"c":null,"p":1839.2,"s":"BINANCE:ETHUSDT","t":1746302498316,"v":0.0054}],"type":"trade"}'

current row count at 2
inserted('2025-05-03 20:01:37 UTC', 'BINANCE:ETHUSDT', 1839.21, 0.019, '2025-05-03 20:01:39 UTC')
current row count at 3
inserted('2025-05-03 20:01:38 UTC', 'BINANCE:ETHUSDT', 1839.21, 0.0049, '2025-05-03 20:01:39 UTC')
current row count at 4
inserted('2025-05-03 20:01:38 UTC', 'BINANCE:ETHUSDT', 1839.21, 0.0029, '2025-05-03 20:01:39 UTC')
current row count at 5
inserted('2025-05-03 20:01:38 UTC', 'BINANCE:ETHUSDT', 1839.21, 0.0032, '2025-05-03 20:01:39 UTC')
current row count at 6
inserted('2025-05-03 20:01:38 UTC', 'BINANCE:ETHUSDT', 1839.21, 0.0025, '2025-05-03 20:01:39 UTC')
current row count at 7
inserted('2025-05-03 20:01:38 UTC', 'BINANCE:ETHUSDT', 1839.2, 0.0054, '2025-05-03 20:01:39 UTC')


++Rcv raw: b'\x81~\x04${"data":[{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302499117,"v":0.0003},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302499117,"v":0.0028},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302499117,"v":0.2596},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302499117,"v":0.0449},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302499117,"v":0.006},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302499117,"v":0.0028},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302499117,"v":0.0028},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302499117,"v":0.0028},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302499117,"v":0.0029},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302499117,"v":0.0032},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302499117,"v":0.0049},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302499117,"v":0.0028},{"c":null,"p":1839.21,"s":"BINANCE:ETHUSDT","t":1746302499117,"v":0.6056},{"c":nu

current row count at 8
inserted('2025-05-03 20:01:39 UTC', 'BINANCE:ETHUSDT', 1839.21, 0.0003, '2025-05-03 20:01:40 UTC')
current row count at 9
inserted('2025-05-03 20:01:39 UTC', 'BINANCE:ETHUSDT', 1839.21, 0.0028, '2025-05-03 20:01:40 UTC')
current row count at 10
inserted('2025-05-03 20:01:39 UTC', 'BINANCE:ETHUSDT', 1839.21, 0.2596, '2025-05-03 20:01:40 UTC')
10 rows reached — closing websocket
WebSocket error: on_close() takes 1 positional argument but 3 were given


False

In [19]:
pd.read_sql('''
SELECT *
FROM price_ticks

'''
, conn)

Unnamed: 0,timestamp,symbol,price,volume,recieved_at
0,2025-05-03 20:01:36 UTC,BINANCE:ETHUSDT,1839.2,1.6885,2025-05-03 20:01:38 UTC
1,2025-05-03 20:01:36 UTC,BINANCE:ETHUSDT,1839.2,0.3115,2025-05-03 20:01:38 UTC
2,2025-05-03 20:01:37 UTC,BINANCE:ETHUSDT,1839.21,0.019,2025-05-03 20:01:39 UTC
3,2025-05-03 20:01:38 UTC,BINANCE:ETHUSDT,1839.21,0.0049,2025-05-03 20:01:39 UTC
4,2025-05-03 20:01:38 UTC,BINANCE:ETHUSDT,1839.21,0.0029,2025-05-03 20:01:39 UTC
5,2025-05-03 20:01:38 UTC,BINANCE:ETHUSDT,1839.21,0.0032,2025-05-03 20:01:39 UTC
6,2025-05-03 20:01:38 UTC,BINANCE:ETHUSDT,1839.21,0.0025,2025-05-03 20:01:39 UTC
7,2025-05-03 20:01:38 UTC,BINANCE:ETHUSDT,1839.2,0.0054,2025-05-03 20:01:39 UTC


In [20]:
conn.close() #close the connection