# Model definition

In [2]:
import polars as pl
import numpy as np

class MidPriceModel:
    def __init__(self, spread_threshold=1.5, max_delay_ms=250, weights=[0.65, 0.15, 0.2]):
        """Initialize model with three price sources and filtering rules."""
        self.spread_threshold = spread_threshold  # Maximum allowed spread
        self.max_delay_ms = max_delay_ms  # Maximum acceptable delay
        self.weights = weights
        self.prices = [None, None, None]  # Latest mid-prices for each provider
        self.timestamps = [None, None, None]  # Timestamps of last tick for each provider

    def on_tick(self, provider: int, tick) -> float:
        """Process incoming tick, update mid-price, and apply filtering rules."""
        current_time, bid, ask = tick["timestamp"], tick["bid"], tick["ask"]
        mid_price, spread = (bid + ask) / 2, ask - bid
        
        if spread > self.spread_threshold:
            # print(f"Ignoring provider {provider} at {current_time}: spread too high ({spread})")
            return None

        if any((current_time - ts).total_seconds() * 1000 > self.max_delay_ms for ts in self.timestamps if ts):
            # print(
            #     [(current_time, ts, self.max_delay_ms, (current_time - ts).total_seconds() * 1000, (current_time - ts)) for ts in self.timestamps if ts]
            # )
            # print(f"Ignoring provider {provider} at {current_time}: delayed update")
            # We save the state for delayed, but we don't use them to produce our final_price
            self.prices[provider] = mid_price
            self.timestamps[provider] = current_time
            return None

        self.prices[provider] = mid_price
        self.timestamps[provider] = current_time

        valid_prices = [p for p in self.prices if p]
        valid_weights = [self.weights[i] for i, p in enumerate(self.prices) if p]

        # Normalize weights
        weight_sum = sum(valid_weights)
        if weight_sum == 0:
            return None

        adjusted_weights = [w / weight_sum for w in valid_weights]
        mid_price = np.dot(valid_prices, adjusted_weights)

        return mid_price


# Model run

## Define input

In [3]:
df1 = pl.read_parquet("0.parquet")
df2 = pl.read_parquet("1.parquet")
df3 = pl.read_parquet("2.parquet")

ticks = pl.concat([df1, df2, df3]).with_columns(pl.col("provider_id").str.to_integer()).sort("timestamp")

In [4]:
ticks

timestamp,bid,ask,provider_id
datetime[μs],f64,f64,i64
2025-01-01 22:55:58.201913,2581.91,2627.09,1
2025-01-01 22:55:58.391727,2583.85,2625.15,1
2025-01-01 23:00:00.003702,2580.85,2628.15,1
2025-01-01 23:00:00.004672,2580.85,2627.07,1
2025-01-01 23:00:00.004695,2581.93,2627.07,1
…,…,…,…
2025-02-03 13:19:59.875523,2810.16,2810.28,0
2025-02-03 13:19:59.877656,2810.16,2810.29,0
2025-02-03 13:19:59.946286,2810.18,2810.29,0
2025-02-03 13:19:59.947359,2810.18,2810.3,0


## Debug

In [5]:
# Params based on the analysis
model = MidPriceModel(spread_threshold=2.5, max_delay_ms=450)

aligned_prices = []
# Enumerate for testing purposes
for i, tick in enumerate(ticks.iter_rows(named=True)):
    if i<1000:
        mid_price = model.on_tick(tick["provider_id"], tick)
        if mid_price:
            aligned_prices.append((tick["timestamp"], mid_price))
    else:
        break
perfect_price_df = pl.DataFrame(aligned_prices, schema=["timestamp", "mid_price"])

  perfect_price_df = pl.DataFrame(aligned_prices, schema=["timestamp", "mid_price"])


In [6]:
perfect_price_df

timestamp,mid_price
datetime[μs],f64
2025-01-01 23:00:00.241136,2627.465
2025-01-01 23:00:00.243113,2626.925
2025-01-01 23:00:00.246095,2626.9575
2025-01-01 23:00:00.248207,2627.778125
2025-01-01 23:00:00.268300,2626.734063
…,…
2025-01-01 23:01:07.024406,2625.13675
2025-01-01 23:01:07.035637,2625.13575
2025-01-01 23:01:07.085176,2625.12875
2025-01-01 23:01:07.093769,2625.13175


## Full run

In [7]:
# Params based on the analysis
model = MidPriceModel(spread_threshold=2.5, max_delay_ms=450)

aligned_prices = []
# Enumerate for testing purposes
for tick in ticks.iter_rows(named=True):
    mid_price = model.on_tick(tick["provider_id"], tick)
    if mid_price:
        aligned_prices.append((tick["timestamp"], mid_price))
perfect_price_df = pl.DataFrame(aligned_prices, schema=["timestamp", "mid_price"])


  perfect_price_df = pl.DataFrame(aligned_prices, schema=["timestamp", "mid_price"])


In [8]:
perfect_price_df

timestamp,mid_price
datetime[μs],f64
2025-01-01 23:00:00.241136,2627.465
2025-01-01 23:00:00.243113,2626.925
2025-01-01 23:00:00.246095,2626.9575
2025-01-01 23:00:00.248207,2627.778125
2025-01-01 23:00:00.268300,2626.734063
…,…
2025-02-03 13:19:59.875523,2810.19925
2025-02-03 13:19:59.877656,2810.2025
2025-02-03 13:19:59.946286,2810.209
2025-02-03 13:19:59.947359,2810.21225
