# TimeScale DB

## Installations

Follow the steps in https://docs.timescale.com/self-hosted/latest/install/

For windows, if `pg_config` doesn't run add `C:\Program Files\PostgreSQL\<version>\bin` to Path in System Variables and restart terminal.

After installing TimescaleDB, open `psql` from search bar and type default for Server, Database, Port, Username (given inside []) and password set during PostgreSQL installation.

Check if `CREATE EXTENSION IF NOT EXISTS timescaledb;` runs in psql shell. If not, edit `postgresql.conf` file as instructed and restart psql.

Use `pgAdmin` as gui.

## Create Hypertable

In [None]:
import sys
import os
import psycopg2

parent_dir = os.path.abspath(os.path.join(os.getcwd(), ".."))
sys.path.append(parent_dir)

# Establish TimeScale DB connection via postgres using psycopg2
conn = psycopg2.connect(database="postgres",
                    host="localhost",
                    user="postgres",
                    password="{YOUR_PASSWORD}",
                    port="5432")

cur = conn.cursor()

In [None]:
# Create postgres table (run once for creation)
cur.execute(
"""
CREATE TABLE order_book_db (
    exchange TEXT NOT NULL,
    symbol TEXT NOT NULL,
    time_exchange TIMESTAMPTZ NOT NULL,
    time_received TIMESTAMPTZ,
    bid_prices FLOAT8[],
    bid_sizes FLOAT8[],
    ask_prices FLOAT8[],
    ask_sizes FLOAT8[]
);
"""
)

conn.commit()

# Convert this to a hypertable that allows TimeScaleDB's partitioning 
# for the column `time_exchange` in the time dimension and `symbol` in the space dimension. 
# The last argument should be the expected number of space partitions.

cur.execute(
"SELECT create_hypertable('order_book_db', 'time_exchange'::name, 'symbol'::name, 4);"
)

conn.commit()

## Inserting Streamed Data

In [None]:
# Example run for Coinbase order-book using csp:
# Run this cell to allow publisher to run in the background
# and execute the next cell to run csp to subscribe and push to db

from dsm.order_book_publisher import *

publisher = Publisher()

coinbase = CoinbaseOrderBookPublisher(
    ws_url="wss://advanced-trade-ws.coinbase.com",
    symbols=["BTC-USD", "ETH-USD"],
    publisher=publisher
)

# binance = BinanceMarketDataPublisher(
#     ws_url="wss://stream.binance.com:9443/ws",
#     symbols=["btcusdt", "ethusdt"],
#     publisher=publisher
# )

publishers = [
    coinbase#, binance
]

threads = [
    threading.Thread(target=p.start, kwargs={"block": False})
    for p in publishers
]

for t in threads:
    t.start()

# try:
#     time.sleep(60*3)
# finally:
#     for p in publishers:
#         p.end()
#     publisher.stop()
#     for t in threads:
#         t.join()


In [None]:
from dsm.order_book_input_adapter import *

@csp.node
def parse_lob_to_db(order_book: ts[OrderBook]):
    exchange = "COINBASE"
    symbol = "BTC-USD"
    time_exchange = order_book.time_exchange
    time_received = order_book.time_received
    bid_prices = [bid.price for bid in order_book.bids]
    bid_sizes = [bid.qty for bid in order_book.bids]
    ask_prices = [ask.price for ask in order_book.asks]
    ask_sizes = [ask.qty for ask in order_book.asks]

    # Insert into table
    cur.execute("""
        INSERT INTO order_book_db (
            exchange, symbol, time_exchange, time_received,
            bid_prices, bid_sizes, ask_prices, ask_sizes
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
    """, (
        exchange,
        symbol,
        time_exchange,
        time_received,
        bid_prices,
        bid_sizes,
        ask_prices,
        ask_sizes
    ))

    conn.commit()

@csp.graph
def subscribe():
    logging.info("Building CSP graph")
    order_book_adapter_manager = OrderBookAdapterManager(exchange="COINBASE")
    symbols = ["BTC-USD"]

    for symbol in symbols:
        order_book_data = order_book_adapter_manager.subscribe(symbol, csp.PushMode.NON_COLLAPSING)
        order_book = order_book_node(order_book=order_book_data)

        parse_lob_to_db(order_book)

        #csp.print("", csp.apply(order_book, lambda x: [[bid.price for bid in x.bids]], List))

        csp.print(f"{symbol} order book:", order_book)

    logging.info("Graph building complete")

csp.run(
    subscribe,
    starttime=datetime.utcnow(),
    endtime=timedelta(seconds=60),
    realtime=True
)

cur.close()
conn.close()

In [None]:
# Example run for Coinbase & Binance trades-stream

## Querying with Timescale DB

### 