# Demo

This demo shows, how to 
- get an index of all files and markets
- donwload files from spaces
- convert the compressed orderbooks back into the raw orderbooks. For typings and data format check [models.py](/src/models.py)
- calculate slippage for a given number of shares


In [None]:
from typing import List, Dict, Any
import psycopg2
from psycopg2.extras import RealDictCursor
from models import DatabaseConfig, SpacesConfig, Order_Book, OrderSummary
from utils import logger
import pandas as pd
import boto3
import json
import matplotlib.pyplot as plt
import seaborn as sns
import os
from botocore.client import Config

2025-02-08 20:40:02,948 - INFO - NumExpr defaulting to 8 threads.


In [None]:
spaces_config = SpacesConfig(
    SPACES_ENDPOINT="",
    SPACES_ACCESS_KEY="",
    SPACES_SECRET_KEY="",
    SPACES_BUCKET_NAME="orderbooks",
    UPLOAD_WINDOW_S=0
)

database_config = DatabaseConfig(
    DB_HOST="",
    DB_PORT="",
    DB_NAME="",
    DB_USER="",
    DB_PASSWORD=""
)

In [3]:

def fetch_all_metadata(database_config: DatabaseConfig) -> List[Dict[str, Any]]:
    """
    Fetch all entries from the orderbook_metadata table.

    Args:
        database_config (DatabaseConfig): Database configuration details.

    Returns:
        List[Dict[str, Any]]: A list of metadata entries where each entry is a dictionary.

    Raises:
        Exception: Logs and raises any error encountered during the query.
    """
    query = """
    SELECT * FROM orderbook_metadata
    ORDER BY date DESC, hour DESC;
    """

    conn = None
    try:
        # Establish database connection
        conn = psycopg2.connect(
            dbname=database_config.DB_NAME,
            user=database_config.DB_USER,
            password=database_config.DB_PASSWORD,
            host=database_config.DB_HOST,
            port=database_config.DB_PORT,
            sslmode="require",
            cursor_factory=RealDictCursor
        )
        logger.debug("Connected to the database to fetch metadata.")

        # Execute query
        with conn.cursor() as cur:
            cur.execute(query)
            result = cur.fetchall()
            logger.info(f"Fetched {len(result)} entries from the database.")
            return result

    except Exception as e:
        logger.critical(f"Failed to fetch metadata from database: {e}")
        raise
    finally:
        if conn:
            conn.close()
            logger.debug("Database connection closed.")

In [4]:
def fetch_metadata_as_dataframe(database_config: DatabaseConfig) -> pd.DataFrame:
    """
    Fetch metadata entries from the database and return as a Pandas DataFrame.

    Args:
        database_config (DatabaseConfig): Configuration for database connection.

    Returns:
        pd.DataFrame: A dataframe containing the metadata entries.
    """
    # Fetch data from database
    metadata_entries: List[Dict[str, Any]] = fetch_all_metadata(database_config)

    # Convert to Pandas DataFrame
    df = pd.DataFrame(metadata_entries)

    # Optional: Format datetime columns to ISO 8601 strings
    if not df.empty:
        df['date'] = pd.to_datetime(df['date']).dt.date  # Keep as date only
        df['fetched_at'] = pd.to_datetime(df['fetched_at']).dt.strftime('%Y-%m-%d %H:%M:%S')
        df['start_time'] = pd.to_datetime(df['start_time']).dt.strftime('%Y-%m-%d %H:%M:%S')
        df['end_time'] = pd.to_datetime(df['end_time']).dt.strftime('%Y-%m-%d %H:%M:%S')
        df['generated_at'] = pd.to_datetime(df['generated_at']).dt.strftime('%Y-%m-%d %H:%M:%S')

    return df

In [None]:
metadata_df = fetch_metadata_as_dataframe(database_config)
metadata_df['timestamp'] = pd.to_datetime(metadata_df['date']) + pd.to_timedelta(metadata_df['hour'], unit='h')
metadata_df[['market_id','timestamp', 'slug', 'order_price_min_tick_size','order_min_size']]

Unnamed: 0,market_id,timestamp,slug,order_price_min_tick_size,order_min_size
0,522036,2025-02-08 19:00:00,bitcoin-above-97000-on-february-14,0.01,5.0
1,515502,2025-02-08 19:00:00,will-bitcoin-reach-200000-by-march-31,0.001,5.0
2,515503,2025-02-08 19:00:00,will-bitcoin-reach-150000-by-march-31,0.01,5.0
3,515504,2025-02-08 19:00:00,will-bitcoin-reach-130000-by-march-31,0.01,5.0
4,515505,2025-02-08 19:00:00,will-bitcoin-reach-120000-by-march-31,0.01,5.0
...,...,...,...,...,...
23330,515505,2024-12-18 22:00:00,will-bitcoin-reach-120000-by-march-31,0.01,5.0
23331,514372,2024-12-18 22:00:00,will-bitcoin-reach-85000-in-december,0.01,5.0
23332,515502,2024-12-18 22:00:00,will-bitcoin-reach-200000-by-march-31,0.01,5.0
23333,515539,2024-12-18 22:00:00,bitcoin-above-100000-on-december-20,0.01,5.0


In [12]:
df_sorted = metadata_df.sort_values(['market_id', 'timestamp'])
df_sorted[['market_id','timestamp', 'hour', 'slug','num_updates']]

Unnamed: 0,market_id,timestamp,hour,slug,num_updates
23324,255322,2024-12-18 22:00:00,22,will-bitcoin-hit-250k-in-2024,74
23312,255322,2024-12-18 23:00:00,23,will-bitcoin-hit-250k-in-2024,142
23300,255322,2024-12-19 00:00:00,0,will-bitcoin-hit-250k-in-2024,122
23288,255322,2024-12-19 01:00:00,1,will-bitcoin-hit-250k-in-2024,138
23276,255322,2024-12-19 02:00:00,2,will-bitcoin-hit-250k-in-2024,142
...,...,...,...,...,...
80,522036,2025-02-08 15:00:00,15,bitcoin-above-97000-on-february-14,213
70,522036,2025-02-08 16:00:00,16,bitcoin-above-97000-on-february-14,217
40,522036,2025-02-08 17:00:00,17,bitcoin-above-97000-on-february-14,231
39,522036,2025-02-08 18:00:00,18,bitcoin-above-97000-on-february-14,214


In [None]:
print(df_sorted['num_updates'].describe())

count    23335.000000
mean       155.971073
std         52.305266
min          0.000000
25%        117.000000
50%        158.000000
75%        197.000000
max        240.000000
Name: num_updates, dtype: float64


In [25]:
market_hour_summary = (
    df_sorted
    .groupby(['slug', 'hour'])['num_updates']
    .sum()
    .reset_index()
    .sort_values('num_updates', ascending=False)
)

# Top 10 (market, hour) combinations by total num_updates:
print(market_hour_summary.head(10))


                                      slug  hour  num_updates
781  will-bitcoin-reach-120000-by-march-31    15        10097
637  will-bitcoin-reach-110000-by-march-31    15        10028
636  will-bitcoin-reach-110000-by-march-31    14         9890
780  will-bitcoin-reach-120000-by-march-31    14         9810
901  will-bitcoin-reach-130000-by-march-31    15         9653
782  will-bitcoin-reach-120000-by-march-31    16         9615
638  will-bitcoin-reach-110000-by-march-31    16         9573
639  will-bitcoin-reach-110000-by-march-31    17         9520
783  will-bitcoin-reach-120000-by-march-31    17         9389
900  will-bitcoin-reach-130000-by-march-31    14         9366


In [None]:
def download_files_from_spaces(
        metadata_df: pd.DataFrame, 
        spaces_config: Dict[str, str], 
        download_dir: str = "./downloaded_files"
    ) -> List[str]:
    """
    Downloads all files for a given market_id from DigitalOcean Spaces.

    Args:
        metadata_df (pd.DataFrame): Filtered metadata for the specific market.
        spaces_config (dict): Configuration for Spaces connection.
        download_dir (str): Local directory to save files.

    Returns:
        List[str]: Paths to downloaded files.
    """
    # Establish Spaces connection
    client = boto3.client(
        "s3",
        endpoint_url=spaces_config.SPACES_ENDPOINT,
        aws_access_key_id=spaces_config.SPACES_ACCESS_KEY,
        aws_secret_access_key=spaces_config.SPACES_SECRET_KEY,
        config=Config(signature_version="s3v4"),
    )

    os.makedirs(download_dir, exist_ok=True)
    downloaded_files = []

    for _, row in metadata_df.iterrows():
        remote_file_path = row['file_path']  # Path in Spaces
        local_file_path = os.path.join(download_dir, os.path.basename(remote_file_path))

        try:
            client.download_file(
                spaces_config.SPACES_BUCKET_NAME,
                remote_file_path,
                local_file_path
            )
            downloaded_files.append(local_file_path)
            logger.info(f"Downloaded {remote_file_path} to {local_file_path}")
        except Exception as e:
            logger.error(f"Failed to download {remote_file_path}. Error: {e}")

    return downloaded_files


In [8]:

def calculate_avg_prices(orderbook: Order_Book, shares: float = 100.0) -> Dict[str, float]:
    """
    Calculate average buy and sell prices for a given number of shares.

    Args:
        orderbook (Order_Book): The reconstructed orderbook.
        shares (float): Number of shares to calculate the price for.

    Returns:
        Dict[str, float]: Average buy and sell prices.
    """
    # Calculate avg buy price
    buy_price = 0.0
    remaining_shares = shares
    for bid in sorted(orderbook.bids, key=lambda x: -x.price):  # Highest price first
        if remaining_shares <= 0:
            break
        trade_size = min(remaining_shares, bid.size)
        buy_price += trade_size * bid.price
        remaining_shares -= trade_size
    avg_buy_price = buy_price / shares if shares > 0 else 0.0

    # Calculate avg sell price
    sell_price = 0.0
    remaining_shares = shares
    for ask in sorted(orderbook.asks, key=lambda x: x.price):  # Lowest price first
        if remaining_shares <= 0:
            break
        trade_size = min(remaining_shares, ask.size)
        sell_price += trade_size * ask.price
        remaining_shares -= trade_size
    avg_sell_price = sell_price / shares if shares > 0 else 0.0

    return {"avg_buy_price": avg_buy_price, "avg_sell_price": avg_sell_price}

In [9]:
def process_orderbooks(files: List[str], shares: float = 100.0) -> pd.DataFrame:
    """
    Processes the downloaded orderbooks to calculate average buy and sell prices.

    Args:
        files (List[str]): Paths to downloaded orderbook files.
        shares (float): Number of shares to calculate prices for.

    Returns:
        pd.DataFrame: Processed results including average prices.
    """
    results = []
    for file_path in files:
        with open(file_path, 'r') as f:
            data = json.load(f)
            id = data['id']
            slug = data['slug']
            order_price_min_tick_size = data['order_price_min_tick_size']


            # Restore initial orderbook
            start_orderbook = Order_Book(
                market=data['start_orderbook']['market'],
                asset_id=data['clob_token_id'],
                fetched_at=data['initial_orderbook_fetched_at'],
                hash=data['start_orderbook']['market'],
                timestamp=data['start_time_stamp'],
                bids=[OrderSummary(**bid) for bid in data['start_orderbook']['bids']],
                asks=[OrderSummary(**ask) for ask in data['start_orderbook']['asks']]
            )
            
            avg_prices = calculate_avg_prices(start_orderbook, shares)
            results.append({
                "timestamp": start_orderbook.timestamp,
                "avg_buy_price": avg_prices['avg_buy_price'],
                "avg_sell_price": avg_prices['avg_sell_price']
            })

            # Process updates
            for update in data['updates']:
                changes = update['changes']
                # Apply bid changes
                for bid in changes['bids']:
                    price, size = bid['price'], bid['size']
                    if size == 0:  # Remove level
                        start_orderbook.bids = [b for b in start_orderbook.bids if b.price != price]
                    else:  # Update level
                        updated = False
                        for b in start_orderbook.bids:
                            if b.price == price:
                                b.size = size
                                updated = True
                        if not updated:
                            start_orderbook.bids.append(OrderSummary(price=price, size=size))

                # Apply ask changes
                for ask in changes['asks']:
                    price, size = ask['price'], ask['size']
                    if size == 0:
                        start_orderbook.asks = [a for a in start_orderbook.asks if a.price != price]
                    else:
                        updated = False
                        for a in start_orderbook.asks:
                            if a.price == price:
                                a.size = size
                                updated = True
                        if not updated:
                            start_orderbook.asks.append(OrderSummary(price=price, size=size))

                # Calculate average prices after each update
                avg_prices = calculate_avg_prices(start_orderbook, shares)
                results.append({
                    "id" : id,
                    "slug": slug,
                    "order_price_min_tick_size": order_price_min_tick_size,
                    "timestamp": update['timestamp'],
                    "avg_buy_price": avg_prices['avg_buy_price'],
                    "avg_sell_price": avg_prices['avg_sell_price']
                })

    return pd.DataFrame(results)

In [None]:

downloaded_files = download_files_from_spaces(metadata_df, spaces_config)
processed_data = process_orderbooks(downloaded_files)
processed_data.to_csv("processed_orderbooks.csv", index=False)


In [None]:
processed_data.head()