In [2]:
import csv

In [15]:
from datetime import datetime, timezone

In [21]:
import time

In [22]:
time.time_ns()

1763736419430603700

In [20]:
datetime.now(timezone.utc).timestamp()*1000

1763736374168.915

In [4]:
EXPECTED_COLUMNS = [
    "timestamp",
    "ticker",
    "isin",
    "direction",
    "status",
    "amount",
    "ask_price",
    "bid_price",
    "order_id",
    "commission",
]

In [46]:
def reader(file_name: str):
    with open(file_name, mode ='r')as file:
        csvFile = csv.DictReader(file)
        headers = csvFile.fieldnames
        yield from csvFile
        #for line in csvFile:
        #    yield line
        return

In [27]:
def parse_utc_with_nanos(ts_ns_str: str):
    is_legit = ts_ns_str.isdigit()
    parsed_dt_utc = None
    iso_utc_nano = None

    if is_legit:
        try:
            ts_ns = int(ts_ns_str)
            sec = ts_ns // 1_000_000_000
            nanos = ts_ns % 1_000_000_000
            # Use UTC without needing extra imports; Python datetime supports up to microseconds
            parsed_dt_utc = datetime.fromtimestamp(sec).replace(microsecond=nanos // 1000)
            # Produce an ISO-like string preserving full nanosecond precision
            iso_utc_nano = f"{parsed_dt_utc.strftime('%Y-%m-%d %H:%M:%S')}.{nanos:09d}Z"
        except (ValueError, OverflowError):
            is_legit = False

    # print("legit:", is_legit)
    if is_legit:
        return parsed_dt_utc
    else:
        return None

In [51]:
read = reader('sample_order_book.csv')

In [None]:
import threading
from concurrent.futures import ThreadPoolExecutor

In [52]:
row = next(read)
valid = 0
invalid = 0
while row:
    try:
        row["commission"] = float(row["commission"])
        valid += 1
    except (TypeError, ValueError):
        invalid += 1
    try:
        row = next(read)
    except StopIteration:
        break

In [None]:
from collections import deque
from dataclasses import dataclass, field
from decimal import Decimal, ROUND_HALF_EVEN, getcontext
from typing import Deque, Dict, Tuple
import time

In [None]:
getcontext().prec = 34  # plenty of precision for currency math
PRICE_QUANT = Decimal("0.00000001")  # adjust to the tick size you care about
ZERO_DECIMAL = Decimal("0").quantize(PRICE_QUANT)

def normalize_price(raw_price):
    """Return a Decimal rounded to PRICE_QUANT; tolerate blanks/None."""
    if raw_price in (None, ""):
        return ZERO_DECIMAL
    if isinstance(raw_price, Decimal):
        candidate = raw_price
    else:
        candidate = Decimal(str(raw_price))
    return candidate.quantize(PRICE_QUANT, rounding=ROUND_HALF_EVEN)

@dataclass
class SlidingWindowState:
    queue: Deque[Tuple[int, Decimal]] = field(default_factory=deque)
    total_price: Decimal = ZERO_DECIMAL
    count: int = 0

    def add_point(self, ts_ns: int, price: Decimal) -> None:
        self.queue.append((ts_ns, price))
        self.total_price += price
        self.count += 1

    def drop_older_than(self, cutoff_ts_ns: int) -> None:
        while self.queue and self.queue[0][0] < cutoff_ts_ns:
            _, old_price = self.queue.popleft()
            self.total_price -= old_price
            self.count -= 1
        if self.count == 0:
            self.total_price = ZERO_DECIMAL

    def mean_price(self) -> Decimal | None:
        if self.count == 0:
            return None
        return (self.total_price / self.count).quantize(PRICE_QUANT, rounding=ROUND_HALF_EVEN)

In [None]:
sliding_window: Dict[str, SlidingWindowState] = {}
rolling_means: Dict[str, Decimal | None] = {}
window_size_ns = 5 * 60 * 1_000_000_000  # 5 minutes in nanoseconds

file_name = 'sample_order_book.csv'
with open(file_name, mode='r', encoding='utf-8', newline='') as file:
    csvFile = csv.DictReader(file)
    headers = csvFile.fieldnames or []
    missing_header_cols = [col for col in EXPECTED_COLUMNS if col not in headers]
    extra_header_cols = [col for col in headers if col not in EXPECTED_COLUMNS]
    for row in csvFile:
        ticker = row.get('ticker')
        if not ticker:
            continue
        ts_raw = row.get('timestamp')
        if ts_raw is None or not ts_raw.isdigit():
            continue
        ts_ns = int(ts_raw)
        ask_price = normalize_price(row.get("ask_price"))
        state = sliding_window.setdefault(ticker, SlidingWindowState())
        state.add_point(ts_ns, ask_price)
        cutoff_ts = ts_ns - window_size_ns
        state.drop_older_than(cutoff_ts)
        rolling_means[ticker] = state.mean_price()

# At this point `rolling_means` holds the latest 5-minute average for each ticker
rolling_means

[]
[]


### Precision tips for rolling price windows
- Convert raw strings to `Decimal` exactly once and keep that representation throughout every arithmetic operation. Mixing `float` back in reintroduces rounding noise.
- Maintain aggregate totals (sum + count) and derive the mean only when needed. This avoids compounding rounding error from repeated divide/multiply steps.
- Use `quantize` with the venue's tick size (for example `0.01` for cents or `0.0001` for FX) so every stored value matches how the exchange prices are quoted.
- If latency is critical, an alternative is to store prices as integer ticks (for example, micro-dollars) and only convert to human-readable decimals at the edges of the system.

In [7]:
dq = deque([1,2,3])

AttributeError: 'collections.deque' object has no attribute 'mean'

In [33]:
for i in range(10):
    row = next(read)
    print(parse_utc_with_nanos(row["timestamp"]))

AttributeError: 'datetime.datetime' object has no attribute 'datetime'